In [1]:
# 1

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("DataAnalysis").getOrCreate()

# Load the datasets
train_df = spark.read.csv("/content/test.csv", header=True, inferSchema=True)
test_df = spark.read.csv("/content/test.csv", header=True, inferSchema=True)

# Display the schema
train_df.printSchema()

root
 |-- 2: integer (nullable = true)
 |-- Great CD: string (nullable = true)
 |-- "My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I'm in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life's hidden gems. This is a desert isle CD in my book. Why she never made it big is just beyond me. Everytime I play this, no matter black, white, young, old, male, female EVERYBODY says one thing ""Who was that singing ?""": string (nullable = true)



In [2]:
# Rename the problematic column
# Get the list of column names
old_columns = train_df.columns

# Assuming the problematic column is the first one (index 0), replace it with a new name
# Adjust the index if it's a different column
new_columns = ['review_text' if col == old_columns[0] else col for col in old_columns]

# Rename the columns in the DataFrame
train_df = train_df.toDF(*new_columns)


In [3]:
# Show the first 10 rows
train_df.show(10)

+-----------+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|review_text|            Great CD|"My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I'm in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life's hidden gems. This is a desert isle CD in my book. Why she never made it big is just beyond me. 

In [4]:
# Count the total number of rows
train_count = train_df.count()
print(f"Total rows in train dataset: {train_count}")

Total rows in train dataset: 399999


In [6]:
# Ensure partitioning for distributed processing (if applicable)
train_df = train_df.repartition(4)  # Adjust partitions based on dataset size


In [7]:
from pyspark.sql.functions import col

old_columns = train_df.columns

# Assuming the problematic column is the first one (index 0), replace it with a new name
# Adjust the index if it's a different column
new_columns = ['review_text' if col == old_columns[0] else col for col in old_columns]

# Rename the columns in the DataFrame
train_df = train_df.toDF(*new_columns)

In [8]:
# Remove duplicates
train_df = train_df.dropDuplicates()

# Debugging: Print column names and schema
print("Columns:", train_df.columns)
train_df.printSchema()


Columns: ['review_text', 'Great CD', '"My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I\'m in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life\'s hidden gems. This is a desert isle CD in my book. Why she never made it big is just beyond me. Everytime I play this, no matter black, white, young, old, male, female EVERYBODY says one thing ""Who was that singing ?"""']
root
 |-- review_text: integer (nullable = true)
 |-- Great CD: string (nullable = true)
 |-- "My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I'm in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life's hidden gems. This is a deser

In [9]:

# Convert columns to appropriate data types
# Example: Convert a column to integer
train_df = train_df.withColumn("Great CD", col("`Great CD`").cast("string"))

# Filter out invalid rows (e.g., where column values do not meet specific conditions)
train_df = train_df.filter(col("review_text") > 0)

In [10]:
# Normalize/scale numeric columns if necessary (example using a custom UDF)
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType

# Example: custom UDF to scale a column
@udf(FloatType())
def scale_column(value):
    # Implement scaling formula here
    return value / 100.0  # Example scaling by dividing by 100

train_df = train_df.withColumn("scaled_column", scale_column(col("review_text")))


In [11]:
# Register the DataFrame as a temporary view
train_df.createOrReplaceTempView("train_view")

In [12]:
# Aggregation query: Summary statistics
spark.sql("SELECT AVG(scaled_column), STDDEV(scaled_column) FROM train_view").show()


+--------------------+---------------------+
|  avg(scaled_column)|stddev(scaled_column)|
+--------------------+---------------------+
|0.015000037165191792| 0.005000006138190...|
+--------------------+---------------------+



In [13]:
# Grouping and filtering query
spark.sql("SELECT `Great CD`, AVG(scaled_column) FROM train_view GROUP BY `Great CD`").show()

+--------------------+--------------------+
|            Great CD|  avg(scaled_column)|
+--------------------+--------------------+
|         It's a Gift|0.019999999552965164|
|             Amazing| 0.01994999955408275|
|the best barbie i...|0.019999999552965164|
|toooo cute...and ...|0.009999999776482582|
|Chapters show how...|0.019999999552965164|
| Excellent Reference|0.019999999552965164|
|    Arrived damaged.|0.009999999776482582|
|Too Limited in it...|0.009999999776482582|
|What's Up with my...|0.009999999776482582|
|Silly, Obnoxious ...|0.009999999776482582|
|             rip-off|0.009999999776482582|
|              Rehash|0.009999999776482582|
|the mike leg is t...|0.009999999776482582|
|  Unreliable Product|0.009999999776482582|
|           Surprise!|0.019999999552965164|
|       One of a Kind|0.019999999552965164|
|Great Collection ...|0.019999999552965164|
|Not overly thrilling|0.009999999776482582|
|             Skip it|0.009999999776482582|
|  Didn't work at all|0.00999999

In [14]:
# Join example (if applicable)
# Example: spark.sql("SELECT * FROM train_view t1 JOIN other_table t2 ON t1.key = t2.key")

# Time-based analysis (if timestamp exists)
# Example for daily aggregation
spark.sql("SELECT date_trunc('day', current_timestamp()) as day, AVG(scaled_column) FROM train_view GROUP BY day").show()

# Insights: Extract patterns and document findings



+-------------------+--------------------+
|                day|  avg(scaled_column)|
+-------------------+--------------------+
|2024-11-25 00:00:00|0.015000037165191792|
+-------------------+--------------------+



In [25]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer

In [26]:
# Define feature columns and the label
feature_columns = ["review_text", "scaled_column"]  # Replace with existing features from train_data
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features")



In [27]:
# Split data
train_data, test_data = train_df.randomSplit([0.8, 0.2], seed=1234)


In [28]:
# Choose model type
# Regression
model = LinearRegression(featuresCol="features", labelCol="Great CD")



In [29]:
# Classification example
# model = LogisticRegression(featuresCol="features", labelCol="target_column")

# Create a pipeline
pipeline = Pipeline(stages=[assembler, model])



In [30]:
from pyspark.sql.functions import col
train_data = train_data.withColumn("Great CD", col("Great CD").cast("double"))


In [31]:
from pyspark.sql.functions import when
train_data = train_data.withColumn(
    "Great CD",
    when(col("Great CD").cast("double").isNotNull(), col("Great CD").cast("double")).otherwise(0)
)


In [32]:
# Fit the model
trained_model = pipeline.fit(train_data)

# Evaluate the model
predictions = trained_model.transform(test_data)


In [35]:
predictions.printSchema()


root
 |-- review_text: integer (nullable = true)
 |-- Great CD: string (nullable = true)
 |-- "My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I'm in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life's hidden gems. This is a desert isle CD in my book. Why she never made it big is just beyond me. Everytime I play this, no matter black, white, young, old, male, female EVERYBODY says one thing ""Who was that singing ?""": string (nullable = true)
 |-- scaled_column: float (nullable = true)
 |-- features: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [38]:
# Regression evaluation
evaluator = RegressionEvaluator(labelCol="scaled_column", predictionCol="prediction", metricName="rmse")

predictions.show(5)


+-----------+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------+--------------------+-------------------+
|review_text|            Great CD|"My lovely Pat has one of the GREAT voices of her generation. I have listened to this CD for YEARS and I still LOVE IT. When I'm in a good mood it makes me feel better. A bad mood just evaporates like sugar in the rain. This CD just oozes LIFE. Vocals are jusat STUUNNING and lyrics just kill. One of life's hidden gems. This is a desert isle CD in

In [39]:
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

RMSE: 9.312639950520862e+24


In [41]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Set up hyperparameter grid
paramGrid = ParamGridBuilder().addGrid(model.regParam, [0.1, 0.01]).addGrid(model.elasticNetParam, [0.8, 0.5]).build()


In [42]:
# Set up cross-validation
crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)


In [43]:
# Run cross-validation and select the best model
cv_model = crossval.fit(train_data)


In [44]:
# Evaluate the best model on test data
final_predictions = cv_model.transform(test_data)
final_rmse = evaluator.evaluate(final_predictions)
print(f"Final RMSE on test data: {final_rmse}")


Final RMSE on test data: 0.015162731255109056
