In [385]:
from pyspark.sql import SparkSession
import time
import pandas as pd
from tqdm import tqdm
import os

# Блок 2. Работа с данными на Spark

In [386]:
spark = SparkSession.builder \
    .master("spark://Daniils-MacBook-Pro.local:7077") \
    .appName("App") \
    .getOrCreate()

In [488]:
from os import listdir
from os.path import isfile, join
mypath = "./archive"
onlyfiles = [join(mypath, f) for f in listdir(mypath) if isfile(join(mypath, f))]

In [489]:
book_paths = [file for file in onlyfiles if file.startswith("./archive/book")]

df = spark.read.option("header", "true").option("multiLine", "true").option("quote", "\"").option("escape", "\"").csv(book_paths[0])

for path in tqdm(book_paths[1:]):
    df1 = spark.read.option("header", "true").option("multiLine", "true").option("quote", "\"").option("escape", "\"").csv(path)
    df = df.unionByName(df1, allowMissingColumns=True)

100%|███████████████████████████████████████████| 22/22 [00:06<00:00,  3.29it/s]


In [490]:
df.write.mode("overwrite").option("header", True).option("delimiter", "&").csv("df.csv")

                                                                                

In [491]:
df.write.mode("overwrite").option("header", True).option("delimiter", "&").parquet("df.parquet")

                                                                                

In [562]:
# read csv
%time
df = spark.read.option("header", "true").option("delimiter", "&").option("multiLine", "true").option("quote", "\"").option("escape", "\"").csv("df.csv")


CPU times: user 7 µs, sys: 2 µs, total: 9 µs
Wall time: 14.1 µs


In [561]:
# read parquet
%time
start_time = time.time()
df = spark.read.option("header", "true").option("delimiter", "&").option("multiLine", "true").option("quote", "\"").option("escape", "\"").parquet("df.parquet")

stop_time = time.time()
stop_time - start_time

CPU times: user 4 µs, sys: 2 µs, total: 6 µs
Wall time: 11 µs


0.17525506019592285

In [495]:
!du -sh "df.parquet"

709M	df.parquet


In [496]:
!du -sh "df.csv"

1.1G	df.csv


In [None]:
# parquet загружается в +- 3 раза быстрее. Занимает в +- 2 раза меньше места.

# Get data

In [479]:
from pyspark.sql.types import DoubleType, IntegerType

In [542]:
df = spark.read.option("header", "true").option("delimiter", "&").option("multiLine", "true").option("quote", "\"").option("escape", "\"").parquet("df.parquet")

df = df.withColumn("CountsOfReview",df.CountsOfReview.cast(IntegerType()))

from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# RatingDistTotal
convertUDF = udf(lambda x: x[6:])

df = df.withColumn("RatingDistTotal", convertUDF(df["RatingDistTotal"]))
df = df.withColumn("RatingDistTotal", df["RatingDistTotal"].cast("int"))

cols = []
convertUDF = udf(lambda x: x[2:])

# Rating
df = df.withColumn("Rating", df["Rating"].cast("float"))

for i in range(1, 6):
    
    col = "RatingDist" + str(i)
    cols.append(col)
    
    df = df.withColumn(col, convertUDF(df[col]))

    df = df.withColumn(col, df[col].cast("int"))

In [543]:
df.orderBy("CountsOfReview", ascending=False).limit(10).toPandas()

                                                                                

Unnamed: 0,Id,Name,Authors,ISBN,Rating,PublishYear,PublishMonth,PublishDay,Publisher,RatingDist5,RatingDist4,RatingDist3,RatingDist2,RatingDist1,RatingDistTotal,CountsOfReview,Language,PagesNumber,Description,Count of text reviews
0,2767052,"The Hunger Games (The Hunger Games, #1)",Suzanne Collins,439023483.0,4.33,2008,9,14,Scholastic Press,3410571,1902780,737951,170029,92146,6313477,154447,eng,374,<b>WINNING MEANS FAME AND FORTUNE.<br />LOSING...,
1,41865,"Twilight (Twilight, #1)",Stephenie Meyer,316015849.0,3.59,2006,6,9,"Little, Brown and Company",1662040,1059392,961688,520599,531054,4734773,94850,eng,501,,
2,19063,The Book Thief,Markus Zusak,375831002.0,4.37,2006,14,3,Alfred A. Knopf,985221,495431,176209,45995,24330,1727186,87685,eng,552,,
3,4667024,The Help,Kathryn Stockett,,4.46,2009,2,10,Amy Einhorn Books/G.P. Putnam's Sons,1307738,639237,177792,35031,16266,2176064,76040,eng,451,<i>Librarian's note: An alternate cover editio...,
4,3,Harry Potter and the Sorcerer's Stone (Harry P...,J.K. Rowling,,4.47,2003,1,11,Scholastic Inc,4268227,1513191,567458,130310,108202,6587388,75911,eng,309,,
5,3636,"The Giver (The Giver, #1)",Lois Lowry,385732554.0,4.13,2006,24,1,Ember,739329,558893,277526,73091,32692,1681531,57034,eng,208,,
6,43641,Water for Elephants,Sara Gruen,1565125606.0,4.09,2007,1,5,Algonquin Books,522497,494509,233922,57353,19553,1327834,52918,eng,335,,
7,2429135,"The Girl with the Dragon Tattoo (Millennium, #1)",Stieg Larsson,,4.14,2008,9,16,Knopf,1165662,863725,362253,108747,77562,2577949,52225,eng,465,"Harriet Vanger, a scion of one of Sweden’s wea...",
8,136251,Harry Potter and the Deathly Hallows (Harry Po...,J.K. Rowling,545010225.0,4.62,2007,21,7,Arthur A. Levine Books / Scholastic Inc.,1932990,526196,154874,33025,21838,2668923,52088,eng,759,,
9,28187,The Lightning Thief (Percy Jackson and the Oly...,Rick Riordan,786838655.0,4.25,2006,1,3,Disney Hyperion Books,935588,567840,275541,60890,25101,1864960,48630,eng,375,,


In [498]:
# a) Топ-10 книг с наибольшим числом ревью

df.orderBy("CountsOfReview", ascending=False).limit(10).toPandas()

                                                                                

Unnamed: 0,Id,Name,Authors,ISBN,Rating,PublishYear,PublishMonth,PublishDay,Publisher,RatingDist5,RatingDist4,RatingDist3,RatingDist2,RatingDist1,RatingDistTotal,CountsOfReview,Language,PagesNumber,Description,Count of text reviews
0,2767052,"The Hunger Games (The Hunger Games, #1)",Suzanne Collins,439023483.0,4.33,2008,9,14,Scholastic Press,3410571,1902780,737951,170029,92146,6313477,154447,eng,374,<b>WINNING MEANS FAME AND FORTUNE.<br />LOSING...,
1,41865,"Twilight (Twilight, #1)",Stephenie Meyer,316015849.0,3.59,2006,6,9,"Little, Brown and Company",1662040,1059392,961688,520599,531054,4734773,94850,eng,501,,
2,19063,The Book Thief,Markus Zusak,375831002.0,4.37,2006,14,3,Alfred A. Knopf,985221,495431,176209,45995,24330,1727186,87685,eng,552,,
3,4667024,The Help,Kathryn Stockett,,4.46,2009,2,10,Amy Einhorn Books/G.P. Putnam's Sons,1307738,639237,177792,35031,16266,2176064,76040,eng,451,<i>Librarian's note: An alternate cover editio...,
4,3,Harry Potter and the Sorcerer's Stone (Harry P...,J.K. Rowling,,4.47,2003,1,11,Scholastic Inc,4268227,1513191,567458,130310,108202,6587388,75911,eng,309,,
5,3636,"The Giver (The Giver, #1)",Lois Lowry,385732554.0,4.13,2006,24,1,Ember,739329,558893,277526,73091,32692,1681531,57034,eng,208,,
6,43641,Water for Elephants,Sara Gruen,1565125606.0,4.09,2007,1,5,Algonquin Books,522497,494509,233922,57353,19553,1327834,52918,eng,335,,
7,2429135,"The Girl with the Dragon Tattoo (Millennium, #1)",Stieg Larsson,,4.14,2008,9,16,Knopf,1165662,863725,362253,108747,77562,2577949,52225,eng,465,"Harriet Vanger, a scion of one of Sweden’s wea...",
8,136251,Harry Potter and the Deathly Hallows (Harry Po...,J.K. Rowling,545010225.0,4.62,2007,21,7,Arthur A. Levine Books / Scholastic Inc.,1932990,526196,154874,33025,21838,2668923,52088,eng,759,,
9,28187,The Lightning Thief (Percy Jackson and the Oly...,Rick Riordan,786838655.0,4.25,2006,1,3,Disney Hyperion Books,935588,567840,275541,60890,25101,1864960,48630,eng,375,,


In [486]:
# b) Топ-10 издателей с наибольшим средним числом страниц в книгах

df.groupBy('Publisher').agg({'PagesNumber': 'mean'}).orderBy("avg(PagesNumber)", ascending=False).limit(10).toPandas()

                                                                                

Unnamed: 0,Publisher,avg(PagesNumber)
0,Crafty Secrets Publications,1807322.0
1,Sacred-texts.com,500000.0
2,Department of Russian Language and Literature ...,322128.6
3,Logos Research Systems,100000.0
4,"Encyclopedia Britannica, Incorporated",32642.0
5,Progressive Management,19106.36
6,Still Waters Revival Books,10080.14
7,"P. Shalom Publications, Incorporated",8539.0
8,"Hendrickson Publishers, Inc. (Peabody, MA)",6448.0
9,IEEE/EMB,6000.0


In [487]:
# c) Десять наиболее активных по числу изданных книг лет
df.groupBy('PublishYear').count().orderBy("count", ascending=False).limit(10).toPandas()

                                                                                

Unnamed: 0,PublishYear,count
0,2007,129507
1,2006,122374
2,2005,117639
3,2004,105733
4,2003,104345
5,2002,95537
6,2001,88228
7,2000,87290
8,2008,80265
9,1999,80155


In [544]:
# d) Топ-10 книг имеющих наибольший разброс в оценках среди книг имеющих больше 500 оценок

from pyspark.sql.functions import col
df1 = df.withColumn("Variance", \
                (((5 - col("Rating")) ** 2 * col("RatingDist5")) \
               + ((4 - col("Rating")) ** 2 * col("RatingDist4")) \
               + ((3 - col("Rating")) ** 2 * col("RatingDist3")) \
               + ((2 - col("Rating")) ** 2 * col("RatingDist2")) \
               + ((1 - col("Rating")) ** 2 * col("RatingDist1"))) / col("RatingDistTotal")
                  )

In [545]:
df1.select(["Variance", "Name", "RatingDist5", "RatingDist4", "RatingDist3", "RatingDist2", "RatingDist1", "Rating", "RatingDistTotal"])\
.filter(col("RatingDistTotal") > 500)\
.orderBy("Variance", ascending=False).limit(10).toPandas()

                                                                                

Unnamed: 0,Variance,Name,RatingDist5,RatingDist4,RatingDist3,RatingDist2,RatingDist1,Rating,RatingDistTotal
0,2.811982,Scientology: The Fundamentals of Thought,218,58,86,109,382,2.56,853
1,2.807018,Scientology: The Fundamentals of Thought,212,56,83,107,380,2.54,838
2,2.755286,Para Entrenar a un Nino: To Train Up a Child,448,282,222,106,598,2.93,1656
3,2.754065,To Train Up a Child,447,279,220,105,585,2.94,1636
4,2.753906,Para Entrenar a un Nino: To Train Up a Child,447,278,221,104,584,2.94,1634
5,2.52276,The Bluebook: A Uniform System Citation,161,75,93,67,146,3.07,542
6,2.52276,The Bluebook: A Uniform System of Citation,161,75,93,67,146,3.07,542
7,2.440198,Dianetics: The Modern Science of Mental Health...,574,169,361,493,1300,2.39,2897
8,2.436404,Dianetics: The Modern Science of Mental Health,568,163,358,491,1289,2.38,2869
9,2.434722,Dianetica: La Ciencia Moderna de la Salud Mental,565,163,358,488,1286,2.38,2860


In [557]:
# e) Любой интересный инсайт из данных

# Запрос группирует по паблишерам + годам и ищет максимальный рейтинг
# у всех опубликованных книг (с кол-вом отзывов более 10) для издателя.

# Так мы можем посмотреть, у каких издателей был самый лучший год по ревью.
df.filter(col("RatingDistTotal") > 50) \
.groupBy(["Publisher", "PublishYear"]) \
.agg({'Rating': 'max'}) \
.orderBy(["max(Rating)"], ascending=False) \
.limit(10) \
.toPandas()

                                                                                

Unnamed: 0,Publisher,PublishYear,max(Rating)
0,Belknap Press,1972,5.0
1,Sandcastle Publishing LLC,2005,4.94
2,"Healing Society, Inc",2005,4.93
3,Healing Society,2007,4.91
4,Alfred Publishing Co.,2008,4.9
5,Warner Bros Pubns,1984,4.9
6,Suhrkamp,2003,4.89
7,Baha'i Publishing Trust,2007,4.89
8,G. Ronald,1976,4.89
9,"Oxford University Press, USA",1991,4.89


# Блок 3. Spark Streaming