In [None]:
spark.conf.set(
    "fs.azure.account.key.moviesdatastorageacc224.dfs.core.windows.net",
    "Access Key"
)

# Now try accessing the file system
display(
    dbutils.fs.ls("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net")
)

path,name,size,modificationTime
abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/critic_reviews.csv,critic_reviews.csv,910245,1721885129000
abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/movies.csv,movies.csv,6894,1721885128000
abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/user_reviews.csv,user_reviews.csv,20041490,1721885112000


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

spark = SparkSession.builder\
    .appName("Best Soccer Movies")\
    .getOrCreate()

spark

In [None]:
moviesLoadDF = spark.read\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","FAILFAST")\
    .csv("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/movies.csv")

moviesLoadDF.printSchema()
moviesLoadDF.show(5)

root
 |-- movieId: string (nullable = true)
 |-- movieTitle: string (nullable = true)
 |-- movieYear: integer (nullable = true)
 |-- movieURL: string (nullable = true)
 |-- movieRank: integer (nullable = true)
 |-- critic_score: string (nullable = true)
 |-- audience_score: string (nullable = true)

+--------------------+--------------------+---------+--------------------+---------+------------+--------------+
|             movieId|          movieTitle|movieYear|            movieURL|movieRank|critic_score|audience_score|
+--------------------+--------------------+---------+--------------------+---------+------------+--------------+
|144d8fde-a175-3e4...|             Offside|     2006|https://www.rotte...|        1|         94%|           74%|
|91217a2b-9d80-345...|   The Damned United|     2009|https://www.rotte...|        2|         92%|           85%|
|cfd33721-a653-43f...|      Diego Maradona|     2019|https://www.rotte...|        3|         90%|          NULL|
|3da46dee-cea6-3eb...

# Transforming and Filtering the dataset "**movies.csv**"
- The movies data had some null values in the "**_critic_score_**" and "**_audience_score_**" column, so filtering it out using the filter function. 
- Since "**_critic_score_**" and "**_audience_score_**" columns are of type string if we try to order them the code doesn't provide the expected result. 
- So, first we need to remove the "%" symbol and then cast the column as "**integer**".
- In the dataset the movies were not ranked based on any particular value or column. We remove that column and then rank the movies based on "**_critic_score_**" column. 

In [None]:
moviesDF = moviesLoadDF.select(col("movieId"),
                               col("movieTitle"),
                               col("movieYear"),
                               col("movieURL"),
                               col("critic_score"),
                               col("audience_score"))\
                        .filter(col("critic_score").isNotNull() & col("audience_score").isNotNull())\
                        .withColumn("critic_score_numeric", regexp_replace(col("critic_score"), "%", "").cast("int"))\
                        .withColumn("audience_score_numeric", regexp_replace(col("audience_score"), "%", "").cast("int"))\
                        .orderBy(col("critic_score_numeric").desc(), col("audience_score_numeric").desc())

#Ranking the movies based on critics score
windowSpec = Window\
    .orderBy(col("critic_score_numeric").desc())

rankedDF = moviesDF\
    .withColumn("Rank", row_number().over(windowSpec))

rankedDF.show(10)

+--------------------+--------------------+---------+--------------------+------------+--------------+--------------------+----------------------+----+
|             movieId|          movieTitle|movieYear|            movieURL|critic_score|audience_score|critic_score_numeric|audience_score_numeric|Rank|
+--------------------+--------------------+---------+--------------------+------------+--------------+--------------------+----------------------+----+
|58dd168c-4047-38e...|    The Class of '92|     2013|https://www.rotte...|        100%|           85%|                 100|                    85|   1|
|4966301c-ec9e-363...|      Next Goal Wins|     2014|https://www.rotte...|        100%|           89%|                 100|                    89|   2|
|9ae2a295-1697-344...|               Mario|     2018|https://www.rotte...|        100%|           69%|                 100|                    69|   3|
|385149c7-d935-4ad...|Sir Alex Ferguson...|     2021|https://www.rotte...|        100%| 

In [None]:
#Writing the result to Azure Data Lake Storage
output_path = "abfss://moviesoutputdata@moviesdatastorageacc224.dfs.core.windows.net/moviesdata.csv"

rankedDF.write\
    .csv(output_path, mode="overwrite")

# Using read modes to identify "Malformed" data
- While reading the dataset "**_critic_reviews.csv_**", first I used the read mode as "**_FAILFAST_**".

`
criticReviewLoadDF = spark.read\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","FAILFAST")\
    .csv("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/critic_reviews.csv")
`

- So, when I ran the code, the code failed which indicates that our dataset has some kind of malformed data. 
- At this stage we can either use the "**_DROPMALFORMED_**" read mode or "**_PERMISSIVE_**" read mode. If we used DROPMALFORMED we won't know which column or what the malformed data is. But, if we used PERMISSIVE we can collect our malformed data inside a column and if we don't require it we can drop the column later. 

In [None]:
criticSchema = StructType([
    StructField("reviewId", IntegerType(), True),
    StructField("creationDate", DateType(), True),
    StructField("criticName", StringType(), True),
    StructField("criticPageUrl", StringType(), True),
    StructField("reviewState", StringType(), True),
    StructField("isFresh", BooleanType(), True),
    StructField("isRotten", BooleanType(), True),
    StructField("isRtUrl", BooleanType(), True),
    StructField("isTopCritic", BooleanType(), True),
    StructField("publicationUrl", StringType(), True),
    StructField("publicationName", StringType(), True),
    StructField("reviewUrl", StringType(), True),
    StructField("quote", StringType(), True),
    StructField("scoreSentiment", StringType(), True),
    StructField("originalScore", StringType(), True),
    StructField("movieId", StringType(), True),
    StructField("_corrupt_records",StringType(),True)
])

criticReviewLoadDF = spark.read\
    .schema(criticSchema)\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","PERMISSIVE")\
    .option("columnNameOfCorruptRecord","_corrupt_records")\
    .csv("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/critic_reviews.csv")

criticReviewLoadDF.printSchema()
criticReviewLoadDF.show(5)

root
 |-- reviewId: integer (nullable = true)
 |-- creationDate: date (nullable = true)
 |-- criticName: string (nullable = true)
 |-- criticPageUrl: string (nullable = true)
 |-- reviewState: string (nullable = true)
 |-- isFresh: boolean (nullable = true)
 |-- isRotten: boolean (nullable = true)
 |-- isRtUrl: boolean (nullable = true)
 |-- isTopCritic: boolean (nullable = true)
 |-- publicationUrl: string (nullable = true)
 |-- publicationName: string (nullable = true)
 |-- reviewUrl: string (nullable = true)
 |-- quote: string (nullable = true)
 |-- scoreSentiment: string (nullable = true)
 |-- originalScore: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- _corrupt_records: string (nullable = true)

+--------+------------+------------------+--------------------+-----------+-------+--------+-------+-----------+-------------------+--------------------+--------------------+--------------------+--------------+-------------+--------------------+----------------+
|rev

In [None]:
#Writing the result to Azure Data Lake Storage
output_pathcorr = "abfss://moviesoutputdata@moviesdatastorageacc224.dfs.core.windows.net/catchingCorruptRecordCritics"

criticReviewLoadDF.write\
    .csv(output_pathcorr, mode="overwrite")

In [None]:
dateFormat = "MM-dd-yyyy"
criticReviewDF = criticReviewLoadDF.select(col("reviewId"),
                                           col("movieId"),
                                           to_date(col("creationDate"),dateFormat).alias("creationDate"),
                                           col("criticName"),
                                           col("criticPageUrl"),
                                           col("reviewState"),
                                           col("isFresh"),
                                           col("isRotten"),
                                           col("isRtUrl"),
                                           col("isTopCritic"),
                                           col("publicationUrl"),
                                           col("publicationName"),
                                           col("reviewUrl"),
                                           col("quote"),
                                           col("scoreSentiment"))\
                                    .filter(col("criticName").isNotNull() & col("criticPageUrl").isNotNull() & col("isRtUrl").isNotNull() & col("reviewUrl").isNotNull() & col("quote").isNotNull())
criticReviewDF.show(10)

+--------+--------------------+------------+---------------+--------------------+-----------+-------+--------+-------+-----------+--------------------+--------------------+--------------------+--------------------+--------------+
|reviewId|             movieId|creationDate|     criticName|       criticPageUrl|reviewState|isFresh|isRotten|isRtUrl|isTopCritic|      publicationUrl|     publicationName|           reviewUrl|               quote|scoreSentiment|
+--------+--------------------+------------+---------------+--------------------+-----------+-------+--------+-------+-----------+--------------------+--------------------+--------------------+--------------------+--------------+
|  159558|1b7b7ed1-b91c-392...|  2000-01-01|    Roger Ebert|/critics/roger-ebert|      fresh|   true|   false|  false|       true|  /critics/source/67|   Chicago Sun-Times|http://www.rogere...|A delightful demo...|      POSITIVE|
|  159570|1b7b7ed1-b91c-392...|  2000-01-01|  Frank Swietek|/critics/frank-sw...

In [None]:
#Writing the result to Azure Data Lake Storage
output_path1 = "abfss://moviesoutputdata@moviesdatastorageacc224.dfs.core.windows.net/criticsReview.csv"

criticReviewDF.write\
    .csv(output_path1, mode="overwrite")

# Using read modes to identify "Malformed" data
- While reading the dataset "**_user_reviews.csv_**", first I used the read mode as "**_FAILFAST_**".

`
userReviewLoadDF = spark.read\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","FAILFAST")\
    .csv("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/user_reviews.csv")
`

- So, when I ran the code, the code failed which indicates that our dataset has some kind of malformed data. 
- At this stage we can either use the "**_DROPMALFORMED_**" read mode or "**_PERMISSIVE_**" read mode. If we used DROPMALFORMED we won't know which column or waht the malformed data is. But, if we used PERMISSIVE we can collect our malformed data inside a column and if we don't require it we can drop the column later. 

In [None]:
userSchema = StructType([
    StructField("movieId", StringType(), True),
    StructField("rating", DoubleType(), True),
    StructField("quote", StringType(), True),
    StructField("reviewId", StringType(), True),
    StructField("isVerified", BooleanType(), True),
    StructField("isSuperReviewer", BooleanType(), True),
    StructField("hasSpoilers", BooleanType(), True),
    StructField("hasProfanity", BooleanType(), True),
    StructField("score", DoubleType(), True),
    StructField("creationDate", DateType(), True),
    StructField("userDisplayName", StringType(), True),
    StructField("userRealm", StringType(), True),
    StructField("userId", StringType(), True),
    StructField("_corrupt_records", StringType(), True)
])

userReviewsLoadDF = spark.read\
    .schema(userSchema)\
    .option("header",True)\
    .option("inferSchema",True)\
    .option("mode","PERMISSIVE")\
    .option("columnNameOfCorruptRecord","_corrupt_records")\
    .csv("abfss://moviesdata@moviesdatastorageacc224.dfs.core.windows.net/user_reviews.csv")

userReviewsLoadDF.show(5)

+--------------------+------+--------------------+--------+----------+---------------+-----------+------------+-----+------------+---------------+---------+------+--------------------+
|             movieId|rating|               quote|reviewId|isVerified|isSuperReviewer|hasSpoilers|hasProfanity|score|creationDate|userDisplayName|userRealm|userId|    _corrupt_records|
+--------------------+------+--------------------+--------+----------+---------------+-----------+------------+-----+------------+---------------+---------+------+--------------------+
|55f6efc8-0f2f-3ca...|   4.0|[b]Bend it Like B...|    NULL|      NULL|           NULL|       NULL|        NULL| NULL|        NULL|           NULL|     NULL|  NULL|"55f6efc8-0f2f-3c...|
|Really liked it. ...|  NULL|                NULL|    NULL|      NULL|           NULL|       NULL|        NULL| NULL|        NULL|           NULL|     NULL|  NULL|Really liked it. ...|
|Emailed my Prof r...|  NULL|                NULL|    NULL|      NULL|     

In [None]:
#Writing the result to Azure Data Lake Storage
output_pathcorrU = "abfss://moviesoutputdata@moviesdatastorageacc224.dfs.core.windows.net/catchingCorruptRecordUsers"

userReviewsLoadDF.repartition(1).write\
    .csv(output_pathcorrU, mode="overwrite")

In [None]:
userReviewsDF = userReviewsLoadDF.select(col("movieId"), 
                                         col("rating"), 
                                         col("quote"), 
                                         col("isVerified"), 
                                         col("isSuperReviewer"), 
                                         col("hasSpoilers"), 
                                         col("hasProfanity"), 
                                         col("userRealm"), 
                                         col("userId"))\
                                .filter(col("quote").isNotNull() & col("isVerified").isNotNull() & col("isSuperReviewer").isNotNull() & col("hasSpoilers").isNotNull() & col("hasProfanity").isNotNull() & col("userRealm").isNotNull() & col("userId").isNotNull())\
                                .orderBy(col("rating").desc())

userReviewsDF.show(10)


+--------------------+------+--------------------+----------+---------------+-----------+------------+---------+---------+
|             movieId|rating|               quote|isVerified|isSuperReviewer|hasSpoilers|hasProfanity|userRealm|   userId|
+--------------------+------+--------------------+----------+---------------+-----------+------------+---------+---------+
|cc778bc8-58db-31c...|   5.0|[font=Arial]This ...|     false|          false|      false|       false|       RT|900779813|
|cc778bc8-58db-31c...|   5.0|I really enjoyed ...|     false|          false|      false|       false|       RT|900780354|
|55f6efc8-0f2f-3ca...|   5.0|Am I retarded, th...|     false|          false|      false|       false|       RT|900659810|
|3da46dee-cea6-3eb...|   5.0|and its like tota...|     false|          false|      false|       false|       RT|900612371|
|55f6efc8-0f2f-3ca...|   5.0|Not many movies c...|     false|          false|      false|       false|       RT|900742422|
|55f6efc8-0f2f-3

In [None]:
#Writing the result to Azure Data Lake Storage
output_path2 = "abfss://moviesoutputdata@moviesdatastorageacc224.dfs.core.windows.net/usersReview.csv"

userReviewsLoadDF.repartition(1).write\
    .csv(output_path2, mode="overwrite")