In [1]:
import os
import time
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 1. Csv VS Parquet

## Connect to Spark

In [2]:
spark = SparkSession.builder\
    .appName("pyspark")\
    .master("spark://spark-master:7077")\
    .config("spark.executor.memory", "1g")\
    .getOrCreate()

23/04/10 17:21:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
book_schema = StructType([
    StructField("Id", IntegerType()),
    StructField("Name", StringType()),
    StructField("PagesNumber", IntegerType()),
    StructField("Authors", StringType()),
    StructField("Publisher", StringType()),
    StructField("Language", StringType()),
    StructField("Rating", DoubleType()),
    StructField("RatingDist1", IntegerType()),
    StructField("RatingDist2", IntegerType()),
    StructField("RatingDist3", IntegerType()),
    StructField("RatingDist4", IntegerType()),
    StructField("RatingDist5", IntegerType()),
    StructField("RatingDistTotal", IntegerType()),
    StructField("CountsOfReview", IntegerType()),
    StructField("PublishDay", IntegerType()),
    StructField("PublishMonth", IntegerType()),
    StructField("PublishYear", IntegerType()),
    StructField("ISBN", StringType())
])

user_schema = StructType([
    StructField("ID", IntegerType()),
    StructField("Name", StringType()),
    StructField("Rating", StringType())
])

## Read data

In [4]:
def process_book_csv(df: pd.DataFrame):
    if 'PagesNumber' not in df.columns:
        df = df.rename(columns={"pagesNumber": "PagesNumber"})

    df['RatingDist1'] = df['RatingDist1'].str[2:].astype(int)
    df['RatingDist2'] = df['RatingDist2'].str[2:].astype(int)
    df['RatingDist3'] = df['RatingDist3'].str[2:].astype(int)
    df['RatingDist4'] = df['RatingDist4'].str[2:].astype(int)
    df['RatingDist5'] = df['RatingDist5'].str[2:].astype(int)
    df['RatingDistTotal'] = df['RatingDistTotal'].str[6:].astype(int)

    df = df.loc[:, [x.name for x in book_schema.fields]]
    return spark.createDataFrame(df, schema=book_schema)

def process_user_csv(df: pd.DataFrame):
    df = df.loc[:, [x.name for x in user_schema.fields]]
    return spark.createDataFrame(df, schema=user_schema)

In [6]:
book_df = spark.createDataFrame([], schema=book_schema)
user_df = spark.createDataFrame([], schema=user_schema)

for path, _, files in os.walk('./data/'):
    for filename in sorted(files):
        csv_path = os.path.join(path, filename)
        print("Reading csv: {}".format(filename))
        csv_df = pd.read_csv(csv_path)

        if "book" in filename:
            book_df = book_df.union(process_book_csv(csv_df))
        elif "user" in filename:
            user_df = user_df.union(process_user_csv(csv_df))

Reading csv: book1-100k.csv
Reading csv: book1000k-1100k.csv
Reading csv: book100k-200k.csv
Reading csv: book1100k-1200k.csv
Reading csv: book1200k-1300k.csv
Reading csv: book1300k-1400k.csv
Reading csv: book1400k-1500k.csv
Reading csv: book1500k-1600k.csv
Reading csv: book1600k-1700k.csv
Reading csv: book1700k-1800k.csv
Reading csv: book1800k-1900k.csv
Reading csv: book1900k-2000k.csv
Reading csv: book2000k-3000k.csv
Reading csv: book200k-300k.csv
Reading csv: book3000k-4000k.csv
Reading csv: book300k-400k.csv
Reading csv: book4000k-5000k.csv
Reading csv: book400k-500k.csv
Reading csv: book500k-600k.csv
Reading csv: book600k-700k.csv
Reading csv: book700k-800k.csv
Reading csv: book800k-900k.csv
Reading csv: book900k-1000k.csv
Reading csv: user_rating_0_to_1000.csv
Reading csv: user_rating_1000_to_2000.csv
Reading csv: user_rating_2000_to_3000.csv
Reading csv: user_rating_3000_to_4000.csv
Reading csv: user_rating_4000_to_5000.csv
Reading csv: user_rating_5000_to_6000.csv
Reading csv: u

In [7]:
book_df.show(10)

23/04/10 17:23:54 WARN TaskSetManager: Stage 1 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+---+--------------------+-----------+--------------------+---------------+--------+------------------+-----------+-----------+-----------+-----------+-----------+---------------+--------------+----------+------------+-----------+----------+
| Id|                Name|PagesNumber|             Authors|      Publisher|Language|            Rating|RatingDist1|RatingDist2|RatingDist3|RatingDist4|RatingDist5|RatingDistTotal|CountsOfReview|PublishDay|PublishMonth|PublishYear|      ISBN|
+---+--------------------+-----------+--------------------+---------------+--------+------------------+-----------+-----------+-----------+-----------+-----------+---------------+--------------+----------+------------+-----------+----------+
|  1|Harry Potter and ...|        652|        J.K. Rowling|Scholastic Inc.|     eng|              4.57|       9896|      25317|     159960|     556485|    1546466|        2298124|         28062|         9|          16|       2006|       NaN|
|  2|Harry Potter and ...|      

                                                                                

## Write data to HDFS

In [8]:
book_df.write.option("header", True)\
    .mode("overwrite")\
    .csv("hdfs://hadoop-namenode:9000/books/data.csv")

book_df.write.option("header", True)\
    .mode("overwrite")\
    .parquet("hdfs://hadoop-namenode:9000/books/data.parquet")

user_df.write.option("header", True)\
    .mode("overwrite")\
    .parquet("hdfs://hadoop-namenode:9000/users/data.parquet")

23/04/10 17:24:02 WARN TaskSetManager: Stage 2 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.
23/04/10 17:24:11 WARN TaskSetManager: Stage 3 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.
23/04/10 17:24:27 WARN TaskSetManager: Stage 4 contains a task of very large size (1241 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

## Compare read speed

In [9]:
csv_start = time.perf_counter()
csv_load = spark.read.csv("hdfs://hadoop-namenode:9000/books/data.csv")
csv_finish = time.perf_counter()

parquet_start = time.perf_counter()
parquet_load = spark.read.parquet("hdfs://hadoop-namenode:9000/books/data.parquet")
parquet_finish = time.perf_counter()

print(f"Csv read complete in {csv_finish - csv_start:0.4f} seconds")
print(f"Parquet read complete in {parquet_finish - parquet_start:0.4f} seconds")

Csv read complete in 0.7820 seconds
Parquet read complete in 0.0904 seconds


# 2. Data Analysis

## a) Top 10 books by reviews count

In [10]:
book_df\
    .select("Name", "CountsOfReview")\
    .orderBy(desc("CountsOfReview"))\
    .show(10)

23/04/10 17:24:42 WARN TaskSetManager: Stage 8 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+--------------+
|                Name|CountsOfReview|
+--------------------+--------------+
|The Hunger Games ...|        154447|
|Twilight (Twiligh...|         94850|
|      The Book Thief|         87685|
|            The Help|         76040|
|Harry Potter and ...|         75911|
|The Giver (The Gi...|         57034|
| Water for Elephants|         52918|
|The Girl with the...|         52225|
|Harry Potter and ...|         52088|
|The Lightning Thi...|         48630|
+--------------------+--------------+
only showing top 10 rows



                                                                                

## b) Top 10 publishers by average pages count

In [11]:
book_df\
    .groupBy("Publisher")\
    .avg("PagesNumber")\
    .orderBy(avg("PagesNumber").desc())\
    .show(10)

23/04/10 17:24:46 WARN TaskSetManager: Stage 9 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+------------------+
|           Publisher|  avg(PagesNumber)|
+--------------------+------------------+
|Crafty Secrets Pu...|         1807321.6|
|    Sacred-texts.com|          500000.0|
|Department of Rus...| 322128.5714285714|
|Logos Research Sy...|          100000.0|
|Encyclopedia Brit...|           32642.0|
|Progressive Manag...|        19106.3625|
|Still Waters Revi...|10080.142857142857|
|P. Shalom Publica...|            8539.0|
|Hendrickson Publi...|            6448.0|
|            IEEE/EMB|            6000.0|
+--------------------+------------------+
only showing top 10 rows



                                                                                

## c) Top 10 years by published books count

In [12]:
book_df\
    .groupBy("PublishYear")\
    .count()\
    .orderBy(count("*").desc())\
    .show(10)

23/04/10 17:24:52 WARN TaskSetManager: Stage 11 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+-----------+------+
|PublishYear| count|
+-----------+------+
|       2007|129507|
|       2006|122374|
|       2005|117639|
|       2004|105733|
|       2003|104345|
|       2002| 95537|
|       2001| 88228|
|       2000| 87290|
|       2008| 80265|
|       1999| 80155|
+-----------+------+
only showing top 10 rows



                                                                                

## d) Top 10 books with more than 500 ratings by rating variance

In [13]:
book_df\
    .filter(book_df.RatingDistTotal > 500)\
    .withColumn("Variance",
       (book_df.RatingDist1 * (1 - book_df.Rating) ** 2 +
        book_df.RatingDist2 * (2 - book_df.Rating) ** 2 +
        book_df.RatingDist3 * (3 - book_df.Rating) ** 2 +
        book_df.RatingDist4 * (4 - book_df.Rating) ** 2 +
        book_df.RatingDist5 * (5 - book_df.Rating) ** 2) / book_df.RatingDistTotal)\
    .select("Name", "Variance")\
    .orderBy(col("Variance").desc())\
    .show(10)

23/04/10 17:24:56 WARN TaskSetManager: Stage 13 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+------------------+
|                Name|          Variance|
+--------------------+------------------+
|Scientology: The ...|2.8119821805392733|
|Scientology: The ...| 2.807017661097852|
|Para Entrenar a u...|2.7552864734299516|
| To Train Up a Child| 2.754064547677261|
|Para Entrenar a u...|2.7539059975520193|
|The Bluebook: A U...| 2.522759778597786|
|The Bluebook: A U...| 2.522759778597786|
|Dianetics: The Mo...| 2.440198032447359|
|Dianetics: The Mo...|2.4364041826420357|
|Dianetica: La Cie...|2.4347216783216785|
+--------------------+------------------+
only showing top 10 rows



                                                                                

## e) Harry and Potter books statistics

In [14]:
book_df.filter(book_df.Name.contains("Harry")).select(lit("Harry").alias("Title contains"), count("*").alias("Count"), avg(book_df.Rating).alias("Average rating"))\
    .union(book_df.filter(book_df.Name.contains("Potter")).select(lit("Potter"), count("*"), avg(book_df.Rating)))\
    .union(book_df.filter(book_df.Name.contains("Harry") & book_df.Name.contains("Potter")).select(lit("Harry Potter"), count("*"), avg(book_df.Rating)))\
    .show()

23/04/10 17:25:26 WARN TaskSetManager: Stage 14 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.
23/04/10 17:25:28 WARN TaskSetManager: Stage 15 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.
23/04/10 17:25:30 WARN TaskSetManager: Stage 16 contains a task of very large size (1820 KiB). The maximum recommended task size is 1000 KiB.

+--------------+-----+-----------------+
|Title contains|Count|   Average rating|
+--------------+-----+-----------------+
|         Harry| 1321|3.613557910673732|
|        Potter| 1281|3.279008587041375|
|  Harry Potter|  405|4.138024691358027|
+--------------+-----+-----------------+



                                                                                

# 3. Spark Streaming

In [15]:
from itertools import chain

In [16]:
rating_map = {
    "it was amazing" : 5,
    "really liked it" : 4,
    "liked it" : 3,
    "it was ok" : 2,
    "did not like it" : 1
}

mapping_expr = create_map([lit(x) for x in chain(*rating_map.items())])

In [17]:
stream_df = spark.readStream\
    .schema(user_schema)\
    .parquet("hdfs://hadoop-namenode:9000/users/data.parquet")\
    .filter(col("Rating") != "This user doesn't have any rating")\
    .withColumn("RatingValue", mapping_expr[col('Rating')])\
    .withColumn("timestamp", current_timestamp())\
    .withWatermark("timestamp", "10 seconds") \
    .groupBy("Name", window(col("timestamp"), "15 seconds"))\
    .agg(avg("RatingValue").alias("AverageRating"))

In [18]:
stream_df.writeStream\
    .format("parquet")\
    .option("header", True)\
    .option("path", "hdfs://hadoop-namenode:9000/streaming/res/book_ratings.parquet")\
    .option("checkpointLocation", "hdfs://hadoop-namenode:9000/streaming/checkpoints/book_ratings")\
    .start()\
    .awaitTermination()