# 1. Import necessary libraries

In [53]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, regexp_extract, regexp_replace, split

# 2. Creating a Spark Session

In [2]:
spark = SparkSession.builder.appName('data-exploration').getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/21 23:22:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 3. Define CSV file paths

In [8]:
import os

# Get the current directory
current_dir = os.getcwd()

# Calculate the base directory (one level up)
base_dir = os.path.abspath(os.path.join(current_dir, ".."))

# Construct the file path in a parameterized way:
links_csv_path = os.path.join(base_dir, "data", "raw", "ml-latest-small", "links.csv")
movies_csv_path = os.path.join(base_dir, "data", "raw", "ml-latest-small", "movies.csv")
ratings_csv_path = os.path.join(base_dir, "data", "raw", "ml-latest-small", "ratings.csv")
tags_csv_path = os.path.join(base_dir, "data", "raw", "ml-latest-small", "tags.csv")

print("File paths are :")
print(f" {links_csv_path} \n {movies_csv_path} \n {ratings_csv_path} \n {tags_csv_path}")

File paths are :
 /Users/test/PycharmProjects/spark-data-ingestion-cleaning/data/raw/ml-latest-small/links.csv 
 /Users/test/PycharmProjects/spark-data-ingestion-cleaning/data/raw/ml-latest-small/movies.csv 
 /Users/test/PycharmProjects/spark-data-ingestion-cleaning/data/raw/ml-latest-small/ratings.csv 
 /Users/test/PycharmProjects/spark-data-ingestion-cleaning/data/raw/ml-latest-small/tags.csv


# 4. Analysing Movies CSV file

## 4.1 Read and display sample & count

In [57]:
movies_df = spark.read.csv(movies_csv_path, header="true")

movies_df.show(10, False)

print(f"Movies count : {movies_df.count()}")

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck (1995)               |Adventure|Children                         |
|9      |Sudden Death

## 4.2 Verify distinct matches count

In [17]:
print(f"Movies distinct count : {movies_df.distinct().count()}")

print(f"Movies 'movieId' distinct count : {movies_df.select('movieId').distinct().count()}")

print(f"Movies 'title' distinct count : {movies_df.select('title').distinct().count()} (---different)")

print(f"Movies 'genres' distinct count : {movies_df.select('genres').distinct().count()} (--can have duplicates)")

Movies distinct count : 9742
Movies 'movieId' distinct count : 9742
Movies 'title' distinct count : 9737 (---different)
Movies 'genres' distinct count : 951 (--can have duplicates)


## 4.3 Lets analyse duplicate titles

In [32]:
print("Titles that are duplicates are ")

duplicate_titles_df = movies_df.groupBy('title').count().orderBy('count', ascending=False)

duplicate_titles_df.show(10, False)

Titles that are duplicates are 
+--------------------------------------+-----+
|title                                 |count|
+--------------------------------------+-----+
|Eros (2004)                           |2    |
|Confessions of a Dangerous Mind (2002)|2    |
|Emma (1996)                           |2    |
|Saturn 3 (1980)                       |2    |
|War of the Worlds (2005)              |2    |
|Fair Game (1995)                      |1    |
|If Lucy Fell (1996)                   |1    |
|Three Wishes (1995)                   |1    |
|Heavenly Creatures (1994)             |1    |
|Snow White and the Seven Dwarfs (1937)|1    |
+--------------------------------------+-----+
only showing top 10 rows



In [40]:
# Lets see the duplicate title rows
duplicate_titles = duplicate_titles_df.filter(col('count') > 1).select('title').collect()

duplicate_titles = [row['title'] for row in duplicate_titles]

movies_df.filter(col('title').isin(duplicate_titles)).orderBy('title').show(20, False)

print('Movies have same titles but different slight different genres')

+-------+--------------------------------------+-----------------------------------+
|movieId|title                                 |genres                             |
+-------+--------------------------------------+-----------------------------------+
|6003   |Confessions of a Dangerous Mind (2002)|Comedy|Crime|Drama|Thriller        |
|144606 |Confessions of a Dangerous Mind (2002)|Comedy|Crime|Drama|Romance|Thriller|
|838    |Emma (1996)                           |Comedy|Drama|Romance               |
|26958  |Emma (1996)                           |Romance                            |
|32600  |Eros (2004)                           |Drama                              |
|147002 |Eros (2004)                           |Drama|Romance                      |
|2851   |Saturn 3 (1980)                       |Adventure|Sci-Fi|Thriller          |
|168358 |Saturn 3 (1980)                       |Sci-Fi|Thriller                    |
|34048  |War of the Worlds (2005)              |Action|Adventure|

## 4.4 Top 20 Genres

In [30]:
print("Top 20 genres are ")

movies_df.groupBy('genres').count().orderBy('count', ascending=False).show(20, False)

Top 20 genres are 
+-----------------------+-----+
|genres                 |count|
+-----------------------+-----+
|Drama                  |1053 |
|Comedy                 |946  |
|Comedy|Drama           |435  |
|Comedy|Romance         |363  |
|Drama|Romance          |349  |
|Documentary            |339  |
|Comedy|Drama|Romance   |276  |
|Drama|Thriller         |168  |
|Horror                 |167  |
|Horror|Thriller        |135  |
|Crime|Drama            |134  |
|Crime|Drama|Thriller   |125  |
|Drama|War              |114  |
|Comedy|Crime           |101  |
|Action|Comedy          |92   |
|Thriller               |84   |
|Children|Comedy        |74   |
|Comedy|Horror          |69   |
|Action|Crime|Thriller  |66   |
|Action|Adventure|Sci-Fi|66   |
+-----------------------+-----+
only showing top 20 rows



## 4.5 Extract year from title

In [58]:
movies_df = movies_df.withColumn("year", regexp_extract("title", r"\((\d{4})\)", 1).cast('int'))
movies_df.show()
movies_df.printSchema()

+-------+--------------------+--------------------+----+
|movieId|               title|              genres|year|
+-------+--------------------+--------------------+----+
|      1|    Toy Story (1995)|Adventure|Animati...|1995|
|      2|      Jumanji (1995)|Adventure|Childre...|1995|
|      3|Grumpier Old Men ...|      Comedy|Romance|1995|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|1995|
|      5|Father of the Bri...|              Comedy|1995|
|      6|         Heat (1995)|Action|Crime|Thri...|1995|
|      7|      Sabrina (1995)|      Comedy|Romance|1995|
|      8| Tom and Huck (1995)|  Adventure|Children|1995|
|      9| Sudden Death (1995)|              Action|1995|
|     10|    GoldenEye (1995)|Action|Adventure|...|1995|
|     11|American Presiden...|Comedy|Drama|Romance|1995|
|     12|Dracula: Dead and...|       Comedy|Horror|1995|
|     13|        Balto (1995)|Adventure|Animati...|1995|
|     14|        Nixon (1995)|               Drama|1995|
|     15|Cutthroat Island ...|A

## 4.6 Update title

In [59]:
movies_df = movies_df.withColumn("title", regexp_replace("title", r"\s*\(\d{4}\)", ""))
movies_df.show(truncate=False)
movies_df.printSchema()

+-------+------------------------------+-------------------------------------------+----+
|movieId|title                         |genres                                     |year|
+-------+------------------------------+-------------------------------------------+----+
|1      |Toy Story                     |Adventure|Animation|Children|Comedy|Fantasy|1995|
|2      |Jumanji                       |Adventure|Children|Fantasy                 |1995|
|3      |Grumpier Old Men              |Comedy|Romance                             |1995|
|4      |Waiting to Exhale             |Comedy|Drama|Romance                       |1995|
|5      |Father of the Bride Part II   |Comedy                                     |1995|
|6      |Heat                          |Action|Crime|Thriller                      |1995|
|7      |Sabrina                       |Comedy|Romance                             |1995|
|8      |Tom and Huck                  |Adventure|Children                         |1995|
|9      |S

## 4.7 Update genres to array of string field

In [60]:
movies_df = movies_df.withColumn("genres", split("genres", "\|"))

movies_df.show(truncate=False)
movies_df.printSchema()

+-------+------------------------------+-------------------------------------------------+----+
|movieId|title                         |genres                                           |year|
+-------+------------------------------+-------------------------------------------------+----+
|1      |Toy Story                     |[Adventure, Animation, Children, Comedy, Fantasy]|1995|
|2      |Jumanji                       |[Adventure, Children, Fantasy]                   |1995|
|3      |Grumpier Old Men              |[Comedy, Romance]                                |1995|
|4      |Waiting to Exhale             |[Comedy, Drama, Romance]                         |1995|
|5      |Father of the Bride Part II   |[Comedy]                                         |1995|
|6      |Heat                          |[Action, Crime, Thriller]                        |1995|
|7      |Sabrina                       |[Comedy, Romance]                                |1995|
|8      |Tom and Huck                  |

  movies_df = movies_df.withColumn("genres", split("genres", "\|"))
