# Processing data using Spark Dataframe with Pyspark

In [1]:

# docker run --name pyspark_jupyter_notebook -it -p 8888:8888 -p 4040:4040 -i jupyter/pyspark-notebook:latest

In [2]:
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
import logging
import sys
from pyspark import StorageLevel
from pyspark.sql.types import *
import logging
import sys
from datetime import datetime, date, timedelta
import pytz

import json
import boto3
import time

spark = SparkSession.builder.appName("mine")\
    .config("spark.jars", "jar/postgresql-42.7.2.jar") \
    .config("spark.sql.catalogImplementation", "hive") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .enableHiveSupport() \
    .getOrCreate()

import logging
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

logger.info('Spark Version :'+spark.version)

INFO:root:Spark Version :3.5.0


### Exercise 1: Use movielens dataset for the following exercise
https://grouplens.org/datasets/movielens/latest/

1. Load movies.csv as movies dataframe. Cache the dataframe
2. Load ratings.csv as ratings dataframe. Cache the dataframe
3. Find the number of records in movies dataframe
4. Find the number of records in ratings dataframe
5. Validate the userId and movieId combination is unique
6. Find average rating and count of rating per movieId using ratings dataframe
7. Find top 10 movies based on the highest average ratings. Consider only those movies that have at least 100 ratings. Show movieId, title, average rating and rating count columns.
8. Show temporary views for current Spark session
9. Register movies dataframe and ratings dayaframe as movies and ratings temporary view respectively. Verify that you can see the new temporary views you just created.
10. Using SQL statement, solve the problem statement for step #7. Match the results from step #7.
11. Find average rating of each genre

### 1. Load movies.csv as movies dataframe. Cache the dataframe
### 3. Find the number of records in movies dataframe



In [3]:
movies = spark.read.option("header", "true").csv("ml-latest-small/movies.csv").cache()
print(movies.count())
movies.show()

9742
+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     1

### 2. Load ratings.csv as ratings dataframe. Cache the dataframe
### 4. Find the number of records in ratings dataframe

In [4]:
ratings = spark.read.option("header", "true").csv("ml-latest-small/ratings.csv").cache()
print(ratings.count())
ratings.show()

100836
+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows


### 5. Validate the userId and movieId combination is unique

In [5]:
unique_count = ratings.groupBy("userId", "movieId").count().filter("count > 1").count()
print(unique_count)

0


### 6. Find average rating and count of rating per movieId using ratings dataframe

In [6]:
average_rating = ratings.groupBy("movieId").agg(F.avg("rating").alias("average_rating") , F.count("rating").alias("rating_count"))
average_rating.show()

+-------+------------------+------------+
|movieId|    average_rating|rating_count|
+-------+------------------+------------+
|    296| 4.197068403908795|         307|
|   1090| 3.984126984126984|          63|
| 115713|3.9107142857142856|          28|
|   3210|3.4761904761904763|          42|
|  88140|          3.546875|          32|
|    829|2.6666666666666665|           9|
|   2088|               2.5|          18|
|   2294|3.2444444444444445|          45|
|   4821|               3.1|           5|
|  48738|             3.975|          20|
|   3959|             3.625|           8|
|  89864|3.6315789473684212|          19|
|   2136|2.4642857142857144|          14|
|    691|3.3333333333333335|           3|
|   3606|              3.75|           4|
| 121007|               4.0|           1|
|   6731|             3.625|           8|
|  27317|              3.75|           6|
|  26082|               4.5|           3|
| 100553|               4.5|           2|
+-------+------------------+------

In [7]:
tmp = movies.join(ratings, movies["movieId"] == ratings["movieId"], "inner").show()

+-------+--------------------+--------------------+------+-------+------+---------+
|movieId|               title|              genres|userId|movieId|rating|timestamp|
+-------+--------------------+--------------------+------+-------+------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|     1|      1|   4.0|964982703|
|      3|Grumpier Old Men ...|      Comedy|Romance|     1|      3|   4.0|964981247|
|      6|         Heat (1995)|Action|Crime|Thri...|     1|      6|   4.0|964982224|
|     47|Seven (a.k.a. Se7...|    Mystery|Thriller|     1|     47|   5.0|964983815|
|     50|Usual Suspects, T...|Crime|Mystery|Thr...|     1|     50|   5.0|964982931|
|     70|From Dusk Till Da...|Action|Comedy|Hor...|     1|     70|   3.0|964982400|
|    101|Bottle Rocket (1996)|Adventure|Comedy|...|     1|    101|   5.0|964980868|
|    110|   Braveheart (1995)|    Action|Drama|War|     1|    110|   4.0|964982176|
|    151|      Rob Roy (1995)|Action|Drama|Roma...|     1|    151|   5.0|964

### 7. Find top 10 movies based on the highest average ratings. Consider only those movies that have at least 100 ratings. Show movieId, title, average rating and rating count columns.

In [8]:
movies_10 = average_rating.join(movies, average_rating["movieId"]==movies["movieId"], "inner").select("title", "average_rating", "rating_count").filter(F.col("rating_count") > 100).orderBy(F.desc("average_rating")).limit(10)
movies_10.show()

+--------------------+-----------------+------------+
|               title|   average_rating|rating_count|
+--------------------+-----------------+------------+
|Shawshank Redempt...|4.429022082018927|         317|
|Godfather, The (1...|        4.2890625|         192|
|   Fight Club (1999)|4.272935779816514|         218|
|Godfather: Part I...| 4.25968992248062|         129|
|Departed, The (2006)|4.252336448598131|         107|
|   Goodfellas (1990)|             4.25|         126|
|Dark Knight, The ...|4.238255033557047|         149|
|Usual Suspects, T...|4.237745098039215|         204|
|Princess Bride, T...|4.232394366197183|         142|
|Star Wars: Episod...|4.231075697211155|         251|
+--------------------+-----------------+------------+


### 8. Show temporary views for current Spark session

In [9]:
## localhost:4040
spark.catalog.listTables()

[Table(name='payment_transaction', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False)]

### 9. Register movies dataframe and ratings dataframe as movies and ratings temporary view respectively. Verify that you can see the new temporary views you just created.

In [10]:
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
average_rating.createOrReplaceTempView("average_rating")
spark.catalog.listTables()

[Table(name='payment_transaction', catalog='spark_catalog', namespace=['default'], description=None, tableType='MANAGED', isTemporary=False),
 Table(name='average_rating', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='movies', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='ratings', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

### 10. Using SQL statement, solve the problem statement for step #7. Match the results from step #7.

In [11]:
movies_10_sql = spark.sql("SELECT m.title, a.average_rating, a.rating_count FROM average_rating a JOIN movies m ON a.movieId = m.movieId WHERE a.rating_count > 100 ORDER BY a.average_rating DESC LIMIT 10")
movies_10_sql.show()

+--------------------+-----------------+------------+
|               title|   average_rating|rating_count|
+--------------------+-----------------+------------+
|Shawshank Redempt...|4.429022082018927|         317|
|Godfather, The (1...|        4.2890625|         192|
|   Fight Club (1999)|4.272935779816514|         218|
|Godfather: Part I...| 4.25968992248062|         129|
|Departed, The (2006)|4.252336448598131|         107|
|   Goodfellas (1990)|             4.25|         126|
|Dark Knight, The ...|4.238255033557047|         149|
|Usual Suspects, T...|4.237745098039215|         204|
|Princess Bride, T...|4.232394366197183|         142|
|Star Wars: Episod...|4.231075697211155|         251|
+--------------------+-----------------+------------+


### 11. Find average rating of each genre

In [12]:
movies_genre = movies.withColumn("genre", F.explode(F.split("genres", "\|")))
movies_genre.show()
movies_genre.createOrReplaceTempView("movies_genre")

+-------+--------------------+--------------------+---------+
|movieId|               title|              genres|    genre|
+-------+--------------------+--------------------+---------+
|      1|    Toy Story (1995)|Adventure|Animati...|Adventure|
|      1|    Toy Story (1995)|Adventure|Animati...|Animation|
|      1|    Toy Story (1995)|Adventure|Animati...| Children|
|      1|    Toy Story (1995)|Adventure|Animati...|   Comedy|
|      1|    Toy Story (1995)|Adventure|Animati...|  Fantasy|
|      2|      Jumanji (1995)|Adventure|Childre...|Adventure|
|      2|      Jumanji (1995)|Adventure|Childre...| Children|
|      2|      Jumanji (1995)|Adventure|Childre...|  Fantasy|
|      3|Grumpier Old Men ...|      Comedy|Romance|   Comedy|
|      3|Grumpier Old Men ...|      Comedy|Romance|  Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|   Comedy|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|    Drama|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|  Romance|
|      5

In [13]:
average_rating_genre = spark.sql("SELECT genre, AVG(rating) AS average_rating FROM movies_genre mg JOIN ratings r ON mg.movieId = r.movieId GROUP BY genre")
average_rating_genre.show()

+------------------+------------------+
|             genre|    average_rating|
+------------------+------------------+
|             Crime| 3.658293867274144|
|           Romance|3.5065107040388437|
|          Thriller|3.4937055799183425|
|         Adventure|3.5086089151939075|
|             Drama|3.6561844113718758|
|               War|   3.8082938876312|
|       Documentary| 3.797785069729286|
|           Fantasy|3.4910005070136894|
|           Mystery| 3.632460255407871|
|           Musical|3.5636781053649105|
|         Animation|3.6299370349170004|
|         Film-Noir| 3.920114942528736|
|(no genres listed)|3.4893617021276597|
|              IMAX| 3.618335343787696|
|            Horror| 3.258195034974626|
|           Western| 3.583937823834197|
|            Comedy|3.3847207640898267|
|          Children| 3.412956125108601|
|            Action| 3.447984331646809|
|            Sci-Fi| 3.455721162210752|
+------------------+------------------+


### Exercise 2: Read data from jdbc + hive table

1. Create 3 dataframe - payment_transaction, account, customer in Spark based on payment_transaction table in demo_test
2. Save payment_transaction, account, customer as parquet file in HDFS
3. Save the payment_transaction dataframe as hive table. Verify that payment_transaction table is accessible in hive as well.
4. Delete the payment_transaction table from hive.

### 1. Create 3 dataframe - payment_transaction, account, customer in Spark based on payment_transaction table in demo_test

In [14]:
payment_transaction = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://introduction-01-intro-ap-southeast-1-dev-introduction-db.cpfm8ml2cxp2.ap-southeast-1.rds.amazonaws.com:5432/postgres") \
    .option("dbtable", "ai4e_test.payment_transaction") \
    .option("user", "postgres") \
    .option("password", "postgres123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

account = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://introduction-01-intro-ap-southeast-1-dev-introduction-db.cpfm8ml2cxp2.ap-southeast-1.rds.amazonaws.com:5432/postgres") \
    .option("dbtable", "ai4e_test.account") \
    .option("user", "postgres") \
    .option("password", "postgres123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

customer = spark.read.format("jdbc") \
    .option("url", "jdbc:postgresql://introduction-01-intro-ap-southeast-1-dev-introduction-db.cpfm8ml2cxp2.ap-southeast-1.rds.amazonaws.com:5432/postgres") \
    .option("dbtable", "ai4e_test.customer") \
    .option("user", "postgres") \
    .option("password", "postgres123") \
    .option("driver", "org.postgresql.Driver") \
    .load()

### 2. Save payment_transaction, account, customer as parquet file in HDFS

In [15]:
payment_transaction.write.mode("overwrite").parquet("payment_transaction.parquet")
account.write.mode("overwrite").parquet("account.parquet")
customer.write.mode("overwrite").parquet("customer.parquet")

### 3. Save the payment_transaction dataframe as hive table. Verify that payment_transaction table is accessible in hive as well.

In [22]:
payment_transaction.write.mode("overwrite").saveAsTable("payment_transaction")
spark.sql("SELECT * FROM payment_transaction").show()

+--------------------+--------------------+--------------+------+-------------+-------------------+------------+
|            trans_id|              acc_id|before_balance|amount|after_balance|   transaction_time|payment_code|
+--------------------+--------------------+--------------+------+-------------+-------------------+------------+
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165554|  -974|       164580|2023-08-01 02:20:00|           3|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164580|  -242|       164338|2023-08-01 04:35:00|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164338|   459|       164797|2023-08-01 07:39:00|           1|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164797|   753|       165550|2023-08-01 11:03:00|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165550|  -736|       164814|2023-08-01 13:12:00|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164814|   354|       165168|2023-08-02 02:05:

### 4. Delete the payment_transaction table from hive.

In [23]:
# spark.sql("DROP TABLE payment_transaction")
# spark.sql("SELECT * FROM payment_transaction").show()

### Exercise 3: Data partitioning

1. Create a dataframe in Spark that refers to Hive table payment_transaction.
2. Find total number of rows. 
3. Parse the time column as date time
4. Save the payment_transaction data with partitioned by year and month based on the time field that you parsed
5. Reload the partitioned dataset and verify the number of record maches with the original.


### 1. Create a dataframe in Spark that refers to Hive table payment_transaction.

In [26]:
payment_transaction = spark.read.table("payment_transaction")

### 2. Find total number of rows.

In [27]:
print(payment_transaction.count())

100387


### 3. Parse the time column as date time

In [36]:
payment_transaction = payment_transaction.withColumn("transaction_time", F.to_date("transaction_time", "yyyy-MM-dd"))
payment_transaction.show()

+--------------------+--------------------+--------------+------+-------------+----------------+------------+
|            trans_id|              acc_id|before_balance|amount|after_balance|transaction_time|payment_code|
+--------------------+--------------------+--------------+------+-------------+----------------+------------+
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165554|  -974|       164580|      2023-08-01|           3|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164580|  -242|       164338|      2023-08-01|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164338|   459|       164797|      2023-08-01|           1|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164797|   753|       165550|      2023-08-01|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165550|  -736|       164814|      2023-08-01|           2|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164814|   354|       165168|      2023-08-02|           2|
|c3fedcdb7

### 4. Save the payment_transaction data with partitioned by year and month based on the time field that you parsed

In [38]:
payment_transaction.write.partitionBy("transaction_time").mode("overwrite").parquet("payment_transaction_partitioned.parquet")

### 5. Reload the partitioned dataset and verify the number of record matches with the original.

In [40]:
payment_transaction_partitioned = spark.read.parquet("payment_transaction_partitioned.parquet")
payment_transaction_partitioned.show()

+--------------------+--------------------+--------------+------+-------------+------------+----------------+
|            trans_id|              acc_id|before_balance|amount|after_balance|payment_code|transaction_time|
+--------------------+--------------------+--------------+------+-------------+------------+----------------+
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165554|  -974|       164580|           3|      2023-08-01|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164580|  -242|       164338|           2|      2023-08-01|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164338|   459|       164797|           1|      2023-08-01|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        164797|   753|       165550|           2|      2023-08-01|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        165550|  -736|       164814|           2|      2023-08-01|
|c3fedcdb7c388930d...|7858bc9db331ac3e3...|        107483|   978|       108461|           2|      2023-08-01|
|c3fedcdb7

# Exercise 4: Data format

Save payment_transaction.csv dataset in the following formats and compare the size on disk

1. csv 
2. Json
3. Parquet

 - CSV file format: ? MB
 - CSV gzip compressed: ? MB
 - Json uncompressed: ? MB
 - Parquet snappy compressed: ? MB

### 1. csv

In [41]:
payment_transaction.write.mode("overwrite").csv("payment_transaction.csv")

In [43]:
!du -sh payment_transaction.csv

'du' is not recognized as an internal or external command,
operable program or batch file.


In [45]:
!gzip payment_transaction.csv

'gzip' is not recognized as an internal or external command,
operable program or batch file.


In [46]:
payment_transaction.write.mode("overwrite").json("payment_transaction.json")

In [47]:
!du -sh payment_transaction.json

'du' is not recognized as an internal or external command,
operable program or batch file.


In [48]:
payment_transaction.write.mode("overwrite").parquet("payment_transaction.parquet")

In [49]:
!du -sh payment_transaction.parquet

'du' is not recognized as an internal or external command,
operable program or batch file.
