### Spark
Apache spark is a distributed data processing framework, before you start, please spend some time in reading below documentation which will help you to understand the concepts.
https://en.wikipedia.org/wiki/Apache_Spark

- Source : https://github.com/ajay291491/Mastering-Big-Data-Analytics-with-PySpark
- Course : https://learning.oreilly.com/videos/mastering-big-data/9781838640583/

## Chapter 01

### Spark - How it works 
Spark mainly has three components as part of its execution 

##### Spark Context  
This is driver program which sets the memory etc for the spark 
When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here. The driver program then runs the operations inside the executors on worker node.
##### Cluster manager 
Which manages resources using YARN 
##### Executor 
Which runs the task which sent by Spark conect 
 
Flow :  "Spark Context" --> "Cluster manager" --> "Executor"

Note : To know more about pyspark refer - https://www.tutorialspoint.com/pyspark/pyspark_quick_guide.htm

#### Components of Spark are below 

##### RDDS (Resilient Distributed Dataset) 
RDDS is the fundamental data structure of Apache Spark which are an immutable collection of objects which computes on the different node of the cluster. Each and every dataset in Spark RDD is logically partitioned across many servers so that they can be computed on different nodes of the cluster.

##### Spark Streaming 
This is used for analyzing data in streams, normally data will be send in mini batches for analyzing
With streaming data frame which initialized will be keep gowring as the new mini batch of streams gets added 
You can Integrate Kenisis streaming with spark streaming

##### Spark SQL
Spark SQL is a Spark module for structured data processing. It provides a programming abstraction called DataFrames and can also act as a distributed SQL query engine

##### MLLib           
This is machine learning library with the spark

#### GraphX
This produces graphs 

#### Spark MLLib 
MLlib is a machine learning library used by pyspark and its intended to provide the practical machine learning scalable and possible. At a high level MLLib privides tools such as. 
- ML Algotithms : Common learning algorithms such as classification, regression, clustering and collaborative filtering 
- featurization : feature extraction, transformation, dimensionality reduction and selection 
- Pipeline      : Tools for constructing, evaluating and tuning ML pipelines 
- Persistance   : Saving and load algorithms, models and pipelines 
- Utilities     : Linear algebra, statistics, data handling etc 

#### Spark DataFrame
DataFrame is a distributed collection of rows(dataset) orginized in named columns. 
- This can be used for relational transform 
- As part of the pyspark.sql package, allows you to run queries over data 
- Faster than RDD (legacy) due to their query plan optimization 
- To know more : https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

##### Spark DataFrame and RDD
- Spark Dataframe is built on top of RDD
- RDDs are immutable in nature, which means it cann't be altered once it is created 
- Since its immutable its easy and safe to share acorss multiple process
- It can be created any time and can live both in memeory and disk

#### Spark SQL
Spark is library which helps you to deal with data frames. 
- This helps to easily load and evaluate the data 
- Execute SQL queries in spark
- DataFrame API with a rich Library functions
- It has integration with hadoop and hive 
- Data source API will have lot of built in integration with various data sources
- JDBC/ODBC connectivity

#### Reading CSV Dataset
Below i sthe step by step procedure to read CSV file in spark. 
It also shows various different methods in reading CSV

- Reading without any parameters 
- Reading with standard parameter 
- Reading with custom scheme while loading data

## Chapter 02

##### Reading with out any special parameter 
In this way data gets read, but this is not always the preferred way of opening a data frame

In [None]:
# Initializing a spark sql session 
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("MyFirstCSVLoad").getOrCreate()

In [2]:
df = spark.read.csv("data-sets/ml-latest-small/ratings.csv")

21/09/03 15:59:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Reading First 5 rows in the dataframe
df.show(5)

+------+-------+------+---------+
|   _c0|    _c1|   _c2|      _c3|
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
+------+-------+------+---------+
only showing top 5 rows



In [4]:
# Reading the schema of the dataframe 
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: string (nullable = true)



##### Reading with standard Parameter Sets (More standard way of creating dataframe)
When we initialize a dataframe then we additionally provide few paramater while initializing 
- path : Path where the file is stored to read 
- sep  : Sets a single character as a separator for each field and value. If None set, uses the default value, ,.
- header : Uses the first line as names of columns. If None is set, it uses the default value, false.
- quote :  sets a single character used for escaping quoted values where the separator can be part of the value. If None is set, it uses the default value, ". If you would like to turn off quotations, you need to set an empty string.
- inferSchema : Infers the input schema automatically from data. It requires one extra pass over the data. If None is set, it uses the default value, false.

In [45]:
# More standard way of Creating a dataframe 
df = spark.read.csv(
    path="data-sets/ml-latest-small/ratings.csv",
    sep=",",
    header=True,
    quote='"',
    inferSchema=True,
)

In [9]:
df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [10]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



##### How to change the schema while loading dataframe
We can change the header name and also the Type of the schema by manually setting those schemas

In [51]:
# here we are manually setting the schema and its type 
df = spark.read.csv(
    path="data-sets/ml-latest-small/ratings.csv",
    sep=",",
    header=True,
    quote='"',
    schema="userID INT, movieID INT, score DOUBLE, timestamp INT",
)

In [12]:
df.show(5)

+------+-------+-----+---------+
|userID|movieID|score|timestamp|
+------+-------+-----+---------+
|     1|      1|  4.0|964982703|
|     1|      3|  4.0|964981247|
|     1|      6|  4.0|964982224|
|     1|     47|  5.0|964983815|
|     1|     50|  5.0|964982931|
+------+-------+-----+---------+
only showing top 5 rows



21/09/03 16:26:32 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: userId, movieId, rating, timestamp
 Schema: userID, movieID, score, timestamp
Expected: score but found: rating
CSV file: file:///study_docs/study_documents/notebooks/Pyspark_Guide/data-sets/ml-latest-small/ratings.csv


In [13]:
df.printSchema()

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- score: double (nullable = true)
 |-- timestamp: integer (nullable = true)



## Chapter 03

#### Fixing issues in the data - Part 01
- In the following topic we will understand how to explore the data and fix them as needed. 
- Here we will be using the module "pyspark.sql.functions" for this purpose
- Detail Doc: https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#module-pyspark.sql.functions

##### Task : Covert the Unix timestamp to Human readable format and remove the original timestamp column 
- Initialize the spark function 
- Rename and existing columnnamed timestamp to timestamp_unix 
- Append a new column which will human readable form timestamp_unix column
- Complete all above steps in single line of code
- Complete all above steps along with initiazing dataframe 
- Drop the timestamp_unix column

In [29]:
# Initializing the spark sql function 
from pyspark.sql import functions as f

In [49]:
# Run Cell 51 to initize the dataframe before this 
# Renaming an existing column and Adding a new column
df = df.withColumnRenamed("timestamp", "timestamp_unix")           # Renaming timestamp to timestamp_unix       
df = df.withColumn("timestamp", f.from_unixtime("timestamp_unix")) # Creating new column timestamp after converting existing timestamp_unix column to human readable format using 'f.from_unixtime'
df = df.withColumn("timestamp", f.to_timestamp("timestamp"))       # Change schema of timestmap column as timestamp

In [50]:
df.show()
df.printSchema()

+------+-------+-----+--------------+-------------------+
|userID|movieID|score|timestamp_unix|          timestamp|
+------+-------+-----+--------------+-------------------+
|     1|      1|  4.0|     964982703|2000-07-30 19:45:03|
|     1|      3|  4.0|     964981247|2000-07-30 19:20:47|
|     1|      6|  4.0|     964982224|2000-07-30 19:37:04|
|     1|     47|  5.0|     964983815|2000-07-30 20:03:35|
|     1|     50|  5.0|     964982931|2000-07-30 19:48:51|
|     1|     70|  3.0|     964982400|2000-07-30 19:40:00|
|     1|    101|  5.0|     964980868|2000-07-30 19:14:28|
|     1|    110|  4.0|     964982176|2000-07-30 19:36:16|
|     1|    151|  5.0|     964984041|2000-07-30 20:07:21|
|     1|    157|  5.0|     964984100|2000-07-30 20:08:20|
|     1|    163|  5.0|     964983650|2000-07-30 20:00:50|
|     1|    216|  5.0|     964981208|2000-07-30 19:20:08|
|     1|    223|  3.0|     964980985|2000-07-30 19:16:25|
|     1|    231|  5.0|     964981179|2000-07-30 19:19:39|
|     1|    23

21/09/03 17:15:47 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: userId, movieId, rating, timestamp
 Schema: userID, movieID, score, timestamp
Expected: score but found: rating
CSV file: file:///study_docs/study_documents/notebooks/Pyspark_Guide/data-sets/ml-latest-small/ratings.csv


##### Now lets do the same steps above in single line operation

In [52]:
# Run Cell 51 to initize the dataframe before this 
df = (
    df
    .withColumnRenamed("timestamp", "timestamp_unix")           # Renaming timestamp to timestamp_unix       
    .withColumn("timestamp", f.from_unixtime("timestamp_unix")) # Creating new column timestamp after converting existing timestamp_unix column to human readable format using 'f.from_unixtime'
    .withColumn("timestamp", f.to_timestamp("timestamp"))       # Change schema of timestmap column as timestamp
)

In [53]:
df.show(5)
df.printSchema()

+------+-------+-----+--------------+-------------------+
|userID|movieID|score|timestamp_unix|          timestamp|
+------+-------+-----+--------------+-------------------+
|     1|      1|  4.0|     964982703|2000-07-30 19:45:03|
|     1|      3|  4.0|     964981247|2000-07-30 19:20:47|
|     1|      6|  4.0|     964982224|2000-07-30 19:37:04|
|     1|     47|  5.0|     964983815|2000-07-30 20:03:35|
|     1|     50|  5.0|     964982931|2000-07-30 19:48:51|
+------+-------+-----+--------------+-------------------+
only showing top 5 rows

root
 |-- userID: integer (nullable = true)
 |-- movieID: integer (nullable = true)
 |-- score: double (nullable = true)
 |-- timestamp_unix: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



21/09/03 17:16:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: userId, movieId, rating, timestamp
 Schema: userID, movieID, score, timestamp
Expected: score but found: rating
CSV file: file:///study_docs/study_documents/notebooks/Pyspark_Guide/data-sets/ml-latest-small/ratings.csv


#### Lets do all above steps into single step to make "ratings" dataframe
- Create dataframe 
- set schemas
- Set original timestamp to a column called timestamp_unix
- Generate a new column named timestamp which convert unix timestamp to human readable 
- Set the schema of the new column to timestamp from INT

In [5]:
# Initializing a spark sql session 
from pyspark.sql import SparkSession
from pyspark.sql import functions as f 

spark = SparkSession.builder.appName("MyFirstCSVLoad").getOrCreate()
ratings_df = (
    spark.read.csv(
        path="data-sets/ml-latest-small/ratings.csv",
        sep=",",
        header=True,
        quote='"',
        schema="userId INT, movieId INT, rating DOUBLE, timestamp INT"
    )
    .withColumnRenamed("timestamp", "timestamp_unix")
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp_unix")))
)

In [6]:
ratings_df.show(5)
ratings_df.printSchema()

+------+-------+------+--------------+-------------------+
|userId|movieId|rating|timestamp_unix|          timestamp|
+------+-------+------+--------------+-------------------+
|     1|      1|   4.0|     964982703|2000-07-30 19:45:03|
|     1|      3|   4.0|     964981247|2000-07-30 19:20:47|
|     1|      6|   4.0|     964982224|2000-07-30 19:37:04|
|     1|     47|   5.0|     964983815|2000-07-30 20:03:35|
|     1|     50|   5.0|     964982931|2000-07-30 19:48:51|
+------+-------+------+--------------+-------------------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp_unix: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



##### How to drop a column from dataframe 

In [7]:
# Below function will drop the data and will also show the result
ratings_df.drop("timestamp_unix").show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 19:45:03|
|     1|      3|   4.0|2000-07-30 19:20:47|
|     1|      6|   4.0|2000-07-30 19:37:04|
|     1|     47|   5.0|2000-07-30 20:03:35|
|     1|     50|   5.0|2000-07-30 19:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



## Chapter 04

#### Fixing issues in the data - Part 02

### Tasks
- Create movie dataframe from moves.csv
- Filter the data with a specific keyword 
- 

In [41]:
# Creating Dataframe from movies.csv

movies_df = spark.read.csv(
    path="data-sets/ml-latest-small/movies.csv",
    sep=",",
    quote='"',
    header=True,
    schema="movieId INT, title STRING, genres STRING "
)

In [42]:
movies_df.show(15)
movies_df.printSchema()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
+-------+--------------------+--------------------+
only showing

###### where() & f.col() - Filter the data with a specific keyword 
- movies_df.where() - used for filtering specific value
- f.col("genres")   - Used to access the column with the name "genres"

In [43]:
movies_df.where(f.col("genres") == 'Action').show()

+-------+--------------------+------+
|movieId|               title|genres|
+-------+--------------------+------+
|      9| Sudden Death (1995)|Action|
|     71|    Fair Game (1995)|Action|
|    204|Under Siege 2: Da...|Action|
|    251|  Hunted, The (1995)|Action|
|    667|Bloodsport 2 (a.k...|Action|
|   1170|Best of the Best ...|Action|
|   1497|  Double Team (1997)|Action|
|   1599|        Steel (1997)|Action|
|   2196|    Knock Off (1998)|Action|
|   2534|    Avalanche (1978)|Action|
|   2817|Aces: Iron Eagle ...|Action|
|   2965|Omega Code, The (...|Action|
|   3283|Minnie and Moskow...|Action|
|   3444|   Bloodsport (1988)|Action|
|   3769|Thunderbolt and L...|Action|
|   4200|Double Impact (1991)|Action|
|   4387|Kiss of the Drago...|Action|
|   4441|Game of Death (1978)|Action|
|   4531|     Red Heat (1988)|Action|
|   4568|Best of the Best ...|Action|
+-------+--------------------+------+
only showing top 20 rows



##### f.split() - Using split to create an Array 
- If you look at the above example it is only showing the exact match for the Genres "Action"
- This is because the movie format is seperated with pipe (|) example "Action|Crime|Thriller"
- So we need to split the delimiter pipe to covert this as an array 

Lets create a new dataframe with the genere as a list from the main dataframe

In [44]:
"""
Creating a new dataframe called movie_genre_df with an additional column 'generes_array' 
which contain the split values of genres with pipe
Note : Split takes delimeter as regex, so we need to use escape sequance
"""
movie_genre_df = (
    movies_df.withColumn("generes_array", f.split("genres", "\|"))
)

If you look at below output a new column created with type Array and has elements inside that 

In [26]:
movie_genre_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- generes_array: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [45]:
movie_genre_df.show(10)

+-------+--------------------+--------------------+--------------------+
|movieId|               title|              genres|       generes_array|
+-------+--------------------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|[Adventure, Anima...|
|      2|      Jumanji (1995)|Adventure|Childre...|[Adventure, Child...|
|      3|Grumpier Old Men ...|      Comedy|Romance|   [Comedy, Romance]|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|[Comedy, Drama, R...|
|      5|Father of the Bri...|              Comedy|            [Comedy]|
|      6|         Heat (1995)|Action|Crime|Thri...|[Action, Crime, T...|
|      7|      Sabrina (1995)|      Comedy|Romance|   [Comedy, Romance]|
|      8| Tom and Huck (1995)|  Adventure|Children|[Adventure, Child...|
|      9| Sudden Death (1995)|              Action|            [Action]|
|     10|    GoldenEye (1995)|Action|Adventure|...|[Action, Adventur...|
+-------+--------------------+--------------------+

##### f.explode - Lets extract each elemnts in the array and create new rows using that
- Now lets Take out each element of that array and create that as a single row items per elements 
- f.explode() : Can help to create new rows from element in a specific array

In [None]:
# below e are using f.explode on "generes_array" to create a new column "genre"
movie_genre_df = movie_genre_df.withColumn("genre", f.explode("generes_array"))

In [49]:
# Printing only the columns what we needed
movie_genre_df.select("movieId", "title", "genre").show()
movie_genre_df.printSchema()

+-------+----------------+---------+
|movieId|           title|    genre|
+-------+----------------+---------+
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
|      1|Toy Story (1995)|Adventure|
|      1|Toy Story (1995)|Animation|
|      1|Toy Story (1995)| Children|
|      1|Toy Story (1995)|   Comedy|
|      1|Toy Story (1995)|  Fantasy|
+-------+----------------+---------+
only showing top 20 rows

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |--

###### Lets understand what are the unique (distinct) types of data available in a columns

- Using this you can understand what are different types of data available in a column
- Any invalid type data in that column
- distinct(), is the function which we use to print the distict values in a columns

In [51]:
available_genres_df = movie_genre_df.select("genre").distinct()

In [52]:
available_genres_df.show()

+------------------+
|             genre|
+------------------+
|             Crime|
|           Romance|
|          Thriller|
|         Adventure|
|             Drama|
|               War|
|       Documentary|
|           Fantasy|
|           Mystery|
|           Musical|
|         Animation|
|         Film-Noir|
|(no genres listed)|
|              IMAX|
|            Horror|
|           Western|
|            Comedy|
|          Children|
|            Action|
|            Sci-Fi|
+------------------+



##### Now lets create a new dataframe which has only rows which do not have any genre

In [56]:
# Accessing column "genre" and filtering with keyword "no genre listed"

movies_without_genre_df = movie_genre_df.where(f.col("genre") == "(no genres listed)")

In [57]:
# Taking the count of total rows in the dataframe

movies_without_genre_df.count()

34

In [58]:
movies_without_genre_df.show()

+-------+--------------------+------------------+--------------------+------------------+
|movieId|               title|            genres|       generes_array|             genre|
+-------+--------------------+------------------+--------------------+------------------+
| 114335|   La cravate (1957)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 122888|      Ben-hur (2016)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 122896|Pirates of the Ca...|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 129250|   Superfast! (2015)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 132084| Let It Be Me (1995)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 134861|Trevor Noah: Afri...|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 141131|    Guardians (2016)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 141866|   Green Room (2015)|(no genres listed)|[(no genres listed)]|(no genres listed)|
| 142456|T

## Chapter 05

##### groupBy() - how to create goups based on specific columns values
- groupBy() function will help you to group the data with a specific element
- This can be combined with other function such as count, sum etc to produce valuble data 


Lets group the movie_genre_df dataframe's genre with "genre" column and see the count of movies againt genre

In [76]:
# Creating a new dataframe that holds the count of total group of genres in the list
movies_per_genre_df = movie_genre_df.groupBy("genre").count()

In [77]:
# This shows the total count of each genre
movies_per_genre_df.show()

+------------------+-----+
|             genre|count|
+------------------+-----+
|             Crime|12685|
|           Romance|13313|
|          Thriller|20039|
|         Adventure|17185|
|             Drama|29296|
|               War| 3436|
|       Documentary|  875|
|           Fantasy|11133|
|           Mystery| 7511|
|           Musical| 3609|
|         Animation| 8795|
|         Film-Noir| 1054|
|(no genres listed)|   34|
|              IMAX| 2912|
|            Horror| 7787|
|           Western| 1481|
|            Comedy|25150|
|          Children| 8874|
|            Action|21388|
|            Sci-Fi|11443|
+------------------+-----+



##### join() - Using join you can join two different dataframes together 
- This can be used for joining two different dataframes 
- There are various joins types available and default one is inner 
- Other sql joins such as outerjoin, left inner, left outer, right inner, right outer and cross joins available

To Know more about joins : https://learning.oreilly.com/videos/mastering-big-data/9781838640583/9781838640583-video3_4/

In [83]:
# Reading the ratings.csv file (will be using this for joining with movies_df data frame)
ratings_df = (
    spark.read.csv(
        path="data-sets/ml-latest-small/ratings.csv",
        sep=",",
        quote='"',
        header=True,
    )
    .withColumnRenamed("timestamp", "timestamp_unix")
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp_unix")))
    .drop("timestamp_unix")
)

In [84]:
ratings_df.show()

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 19:45:03|
|     1|      3|   4.0|2000-07-30 19:20:47|
|     1|      6|   4.0|2000-07-30 19:37:04|
|     1|     47|   5.0|2000-07-30 20:03:35|
|     1|     50|   5.0|2000-07-30 19:48:51|
|     1|     70|   3.0|2000-07-30 19:40:00|
|     1|    101|   5.0|2000-07-30 19:14:28|
|     1|    110|   4.0|2000-07-30 19:36:16|
|     1|    151|   5.0|2000-07-30 20:07:21|
|     1|    157|   5.0|2000-07-30 20:08:20|
|     1|    163|   5.0|2000-07-30 20:00:50|
|     1|    216|   5.0|2000-07-30 19:20:08|
|     1|    223|   3.0|2000-07-30 19:16:25|
|     1|    231|   5.0|2000-07-30 19:19:39|
|     1|    235|   4.0|2000-07-30 19:15:08|
|     1|    260|   5.0|2000-07-30 19:28:00|
|     1|    296|   3.0|2000-07-30 19:49:27|
|     1|    316|   3.0|2000-07-30 19:38:30|
|     1|    333|   5.0|2000-07-30 19:19:39|
|     1|    349|   4.0|2000-07-3

Now lets do a inner join between movies_df and ratings_df

In [88]:
# Creating a new dataframe with the inner join from movies_df with ratings_df
opinion_df = movies_df.join(ratings_df, ["movieId"], "inner")

In [91]:
opinion_df.show()
opinion_df.count()

+-------+--------------------+--------------------+------+------+-------------------+
|movieId|               title|              genres|userId|rating|          timestamp|
+-------+--------------------+--------------------+------+------+-------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|   4.0|2000-07-30 19:45:03|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|   4.0|2000-07-30 19:20:47|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|   4.0|2000-07-30 19:37:04|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|   5.0|2000-07-30 20:03:35|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|   5.0|2000-07-30 19:48:51|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|     1|   3.0|2000-07-30 19:40:00|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|     1|   5.0|2000-07-30 19:14:28|
|    110|   Braveheart (1995)|    Action|Drama|War|     1|   4.0|2000-07-30 19:36:16|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|   

100836

In [93]:
# Creating a new dataframe with the Outer join from movies_df with ratings_df
opinion_outer_df = movies_df.join(ratings_df, ["movieId"], "outer")

In [94]:
opinion_outer_df.show()
opinion_outer_df.count()

+-------+--------------------+------+------+------+-------------------+
|movieId|               title|genres|userId|rating|          timestamp|
+-------+--------------------+------+------+------+-------------------+
|    148|Awfully Big Adven...| Drama|   191|   5.0|1996-04-17 18:08:17|
|    471|Hudsucker Proxy, ...|Comedy|    32|   3.0|1997-02-23 22:32:45|
|    471|Hudsucker Proxy, ...|Comedy|    57|   3.0|2000-09-24 01:00:04|
|    471|Hudsucker Proxy, ...|Comedy|    91|   1.0|2005-04-05 16:10:17|
|    471|Hudsucker Proxy, ...|Comedy|   104|   4.5|2009-03-26 23:45:29|
|    471|Hudsucker Proxy, ...|Comedy|   133|   4.0|1996-09-23 16:16:33|
|    471|Hudsucker Proxy, ...|Comedy|   136|   4.0|1996-05-18 21:07:38|
|    471|Hudsucker Proxy, ...|Comedy|   171|   3.0|1997-06-21 16:08:03|
|    471|Hudsucker Proxy, ...|Comedy|   176|   5.0|1996-08-15 12:37:55|
|    471|Hudsucker Proxy, ...|Comedy|   182|   4.5|2003-06-05 03:20:44|
|    471|Hudsucker Proxy, ...|Comedy|   216|   3.0|2000-11-26 04

100854