In [None]:
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("Moviedata").getOrCreate()

# User purchase data

In [None]:
Userdf = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("user_purchase.csv")
)

In [None]:
# check for null values

Userdf.select([F.count(F.when(F.isnull(x), x)).alias(x) for x in Userdf.columns]).show()

In [None]:
# drop null values
dropped_null_df = Userdf.dropna()

In [None]:
# check again for null values

dropped_null_df.select(
    [F.count(F.when(F.isnull(x), x)).alias(x) for x in dropped_null_df.columns]
).show()

In [None]:
# drop duplicate values

new_df = dropped_null_df.dropDuplicates()

In [None]:
# check for negative values
for column in new_df.columns:
    neg_values = new_df.select(F.col(column)).where(F.col(column) < 0).count()
    print(f"{column}-\n\t Negative values: {neg_values}")

In [None]:
# remove negative values

df_without_negative_values = new_df.where(
    (F.col("Quantity") >= 0) & (F.col("UnitPrice") >= 0)
)

# I don't think Quantity should have a zero value. So we can change the
# condition on Quatity to be only > and not >= before running the code above

In [None]:
# check for negative values again

for column in df_without_negative_values.columns:
    neg_values = (
        df_without_negative_values.select(F.col(column))
        .where(F.col(column) < 0)
        .count()
    )
    print(f"{column}-\n\t Negative values: {neg_values}")

In [None]:
# new clean file
df_without_negative_values.write.option("header",True).csv("user_purchase_clean.csv")

# Movie Data

In [None]:
moviedf = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv("movie_review.csv")
)

# df.show(10)

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import *

# Getting a list of words
tokenizer = Tokenizer(inputCol="review_str", outputCol="review_token")
moviedf = tokenizer.transform(moviedf)
# moviedf.show(10)

In [None]:
# Removing stop words
remover = StopWordsRemover(inputCol="review_token", outputCol="filtered")
moviedf = remover.transform(moviedf)
# moviedf.show(10)

In [None]:
# Getting good words
moviedf = moviedf.withColumn(
    "positive_review", array_contains(moviedf["filtered"], "good")
)
moviedf = moviedf.withColumn("insert_column", current_timestamp())
# moviedf.show(10)

In [None]:
# coverting boolean true and false to 0 and 1
moviedf = moviedf.withColumn(
    "positive_review_int", when(moviedf["positive_review"] == "true", 1).otherwise(0)
)

# dropping columns
moviedf = moviedf.drop(
    "review_str", "review_token", "filtered", "positive_review", "insert_column"
)
# moviedf.show(10)

# new clean file
moviedf.write.option("header",True).csv("movie_review_clean.csv")

# Log Data

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
import xml.etree.ElementTree as ET

logdf = spark.read.option("header", "true").csv("log_reviews.csv")

# creating the Schema
xmlSchema = StructType(
    [
        StructField("logDate", StringType(), True),
        StructField("device", StringType(), True),
        StructField("location", StringType(), True),
        StructField("os", StringType(), True),
        StructField("ipAddress", StringType(), True),
        StructField("phoneNumber", StringType(), True),
    ]
)

# extract log column from dataframe
xml_data = logdf.select("log").rdd.flatMap(lambda x: x).collect()

# Convert the XML data to an RDD
xml_rdd = spark.sparkContext.parallelize(xml_data)


def parse_xml(xml_string):
    root = ET.fromstring(xml_string)
    logs = []
    for log in root.findall("log"):
        log_dict = {}
        log_dict["logDate"] = log.find("logDate").text
        log_dict["device"] = log.find("device").text
        log_dict["location"] = log.find("location").text
        log_dict["os"] = log.find("os").text
        log_dict["ipAddress"] = log.find("ipAddress").text
        log_dict["phoneNumber"] = log.find("phoneNumber").text
        logs.append(log_dict)
    return logs

# parse_xml function to each element of the XML RDD
parsed_xml_rdd = xml_rdd.flatMap(parse_xml)

# creating a new data frame
df = spark.createDataFrame(parsed_xml_rdd, schema=xmlSchema)

# creating a new column with the id review
df = df.withColumn("id_review", monotonically_increasing_id() + 1)
df.show()
df.write.option("header",True).csv("log_reviews_clean.csv")


In [None]:
new_movie = spark.read.option("header", "true").csv("movie_review_clean.csv")
new_movie.show(10)

new_log = spark.read.option("header", "true").csv("log_reviews_clean.csv")
new_log.show(10)

new_user = spark.read.option("header", "true").csv("user_purchase_clean.csv")
new_user.show(10)