# Spark session

In [1]:
from pyspark.sql import SparkSession

# Spark session & context
spark = (SparkSession
         .builder
         .master("local")
         .appName("load-postgres")
         # Add postgres jar
         .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/postgresql-9.4.1207.jar")
         .getOrCreate())
sc = spark.sparkContext

21/11/01 03:17:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Read Json data

In [2]:
# read the books data from internet archive
df_books = (
    spark.read
    .format("json")
    .load("/home/jovyan/work/data/ol_cdump.json")
)

                                                                                

# Schema for the Dataframe

In [4]:
#schema
df_books.printSchema()

root
 |-- alternate_names: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- authors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- author: struct (nullable = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- type: string (nullable = true)
 |-- bio: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- by_statement: string (nullable = true)
 |-- contributions: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- contributors: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- copyright_date: string (nullable = true)
 |-- covers: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- created: struct (nullable = true)
 |    |-- type: string (nullable = true)
 |    |-- value: stri

# No of records in Data Frame

In [5]:
# total number of records before DQC
df_books.repartition(4).count()

                                                                                

148163

# Data Quality Checker

In [6]:
# Quality checker and filter events
df_dqc=df_books.filter("(title is not null or length(title)>0) and number_of_pages>20 and publish_date between 1950 and current_date")

# Post DQC Count

In [7]:
# total records after the DQC filter
df_dqc.count()

                                                                                

76076

# Cleansed and exploded Author DF

In [8]:
# explode author and convert created_time to timestamp type

from pyspark.sql.functions import from_unixtime, col, to_timestamp
from pyspark.sql.functions import explode,regexp_replace

df_author_explode = (df_dqc
            .withColumn('created_time', to_timestamp("created.value"))
            .filter("created_time is not null")
            .select(explode("authors.key").alias("author"),"created_time","publish_date","title","number_of_pages")
            .filter("author is  NOT NULL")
            .select(regexp_replace('author', '/authors/', '').alias("author"),"created_time","publish_date","title","number_of_pages")
             )


In [28]:
df_author_explode.select("title","author","publish_date","created_time","number_of_pages").orderBy(col("publish_date").desc(),col("created_time").desc()).show(100)

[Stage 8:>                                                          (0 + 1) / 2]

+--------------------+----------+------------+--------------------+---------------+
|               title|    author|publish_date|        created_time|number_of_pages|
+--------------------+----------+------------+--------------------+---------------+
|   What's that bird?|OL1390027A|        2011|2009-02-17 21:36:...|            117|
|        Oral biology|OL7125007A|        2010|2012-05-16 07:31:...|            422|
|        Oral biology|OL5893410A|        2010|2012-05-16 07:31:...|            422|
|        Oral biology|OL7125008A|        2010|2012-05-16 07:31:...|            422|
|The Little, Brown...| OL387041A|        2010|2009-02-17 17:35:...|            965|
|The foreclosure o...|OL6569545A|        2009|2009-02-17 19:54:...|            354|
|Basic photographi...|OL6569535A|        2009|2009-02-17 19:54:...|            462|
|     Mrs Mismarriage|OL6569385A|        2009|2009-02-17 19:48:...|            239|
|          Be my baby|OL1534595A|        2009|2009-02-17 19:48:...|         



# First Published Author

In [263]:
str=df_author_explode.orderBy(col("publish_date").asc(),col("created_time").asc()).head().author
print(str)

[Stage 269:>                                                        (0 + 1) / 2]

OL2170249A


                                                                                

# Last Published Author

In [197]:
df_author_explode.orderBy(col("publish_date").desc(),col("created_time").desc()).head().author

                                                                                

'OL1390027A'

# Windowing to get the first and last published author each year

In [198]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number,first,last,countDistinct
windowSpec  = Window.partitionBy("publish_date").orderBy("created_time")
windowSpec1  = Window.partitionBy("publish_date").orderBy(col("created_time").desc())
df_first_last_author_year = (
    df_author_explode
                 .withColumn("fv",first("author", True).over(windowSpec))
                 .withColumn("lv",first("author", True).over(windowSpec1))
                 .select("publish_date","author","title","created_time","fv","lv")
)
df_first_last_author_year.groupBy("publish_date","fv","lv").agg(countDistinct("author")).orderBy("publish_date").show(100)




+------------+----------+----------+-------------+
|publish_date|        fv|        lv|count(author)|
+------------+----------+----------+-------------+
|        1950|OL2170249A|OL6570747A|          614|
|        1951|OL2170249A|OL6570639A|          648|
|        1952|OL6535912A|OL6570637A|          601|
|        1953|OL2261944A| OL731406A|          600|
|        1954|OL6535994A| OL120803A|          644|
|        1955| OL135548A|OL4673667A|          588|
|        1956|OL4880801A|OL6570892A|          638|
|        1957|OL1786598A| OL234204A|          705|
|        1958|OL4491638A|OL1674060A|          753|
|        1959|OL6535916A| OL367501A|          830|
|        1960|OL6314966A| OL142020A|          920|
|        1961|OL1642212A|OL6570918A|         1026|
|        1962|OL4409259A|OL2194405A|         1064|
|        1963|OL6535908A|OL4673667A|         1099|
|        1964|OL5900510A| OL409560A|         1194|
|        1965|OL5459805A|OL6561328A|         1248|
|        1966| OL133670A|OL6561

                                                                                

# top 5 genres with most published books

In [194]:
 from pyspark.sql.functions import translate,count,countDistinct

cleaned_books_genre_temp = (
                        df_dqc.select('title',explode("genres").alias("genr"))
                              .select("title", translate('genr','.','').alias("genre"))
                       )

cleaned_books_genre =(
                      cleaned_books_genre_temp.groupBy("genre")
                          .agg(count("title").alias("count_title"))
                          .orderBy('count_title', ascending=False).limit(5)
                      )
cleaned_books_genre.show()



+-------------------+-----------+
|              genre|count_title|
+-------------------+-----------+
|            Fiction|       3418|
|          Biography|       2792|
|Juvenile literature|       1682|
|        Exhibitions|       1152|
|   Juvenile fiction|        700|
+-------------------+-----------+



                                                                                

# Retrieve the top 5 authors who (co-)authored the most books

In [225]:
from pyspark.sql.window import Window
windowSpec  = Window.partitionBy("title")
df_unque_title_author = df_author_explode.select('title','author').distinct()

df_title_co_authors = (
                     df_unque_title_author
                    .withColumn("cnt",count("author").over(windowSpec))
                    .filter(col("cnt") > 1)
)

df_top_authors_co_authors = (
                                df_title_co_authors.groupBy("author")
                                .agg(countDistinct("title").alias("count_title"))
                                .orderBy('count_title', ascending=False).limit(5)
)
df_top_authors_co_authors.show()



+----------+-----------+
|    author|count_title|
+----------+-----------+
|OL4933802A|          8|
| OL283941A|          8|
| OL785848A|          7|
|OL4416445A|          6|
|OL6534774A|          6|
+----------+-----------+





# Per publish year, the number of authors that published at least one book

In [231]:
 from pyspark.sql.functions import countDistinct
df_authors_per_year = (
                        df_author_explode.groupBy("publish_date").agg(countDistinct("author"))
                       .orderBy('publish_date')
)
df_authors_per_year.show(1000)



+------------+-------------+
|publish_date|count(author)|
+------------+-------------+
|        1950|          614|
|        1951|          648|
|        1952|          601|
|        1953|          600|
|        1954|          644|
|        1955|          588|
|        1956|          638|
|        1957|          705|
|        1958|          753|
|        1959|          830|
|        1960|          920|
|        1961|         1026|
|        1962|         1064|
|        1963|         1099|
|        1964|         1194|
|        1965|         1248|
|        1966|         1203|
|        1967|         1243|
|        1968|         1082|
|        1969|         1209|
|        1970|         1139|
|        1971|         1070|
|        1972|         1010|
|        1973|         1135|
|        1974|         1096|
|        1975|          985|
|        1976|          888|
|        1977|          704|
|        1978|          735|
|        1979|          813|
|        1980|          852|
|        1981|

                                                                                

# The number of authors and number of books published per month for years between 1950 and 1970

In [265]:
from pyspark.sql.functions import month,countDistinct
df_month_stat =  (
                 df_author_explode
                .filter("publish_date between 1950 and 1970")
                .withColumn("mon",month("created_time"))
                .groupBy("mon","publish_date").agg(countDistinct("author").alias("author_cnt")
                ,countDistinct("title").alias("title_count")).orderBy("publish_date","mon")
)
df_month_stat.show(1000)



+---+------------+----------+-----------+
|mon|publish_date|author_cnt|title_count|
+---+------------+----------+-----------+
|  1|        1950|       204|        206|
|  2|        1950|       377|        399|
|  9|        1950|         1|          1|
| 12|        1950|        39|         40|
|  1|        1951|       212|        217|
|  2|        1951|       404|        420|
|  9|        1951|         1|          1|
| 12|        1951|        37|         37|
|  1|        1952|       176|        185|
|  2|        1952|       386|        407|
| 12|        1952|        40|         41|
|  1|        1953|       210|        216|
|  2|        1953|       361|        371|
| 12|        1953|        34|         34|
|  1|        1954|       218|        220|
|  2|        1954|       381|        391|
| 12|        1954|        50|         50|
|  1|        1955|       211|        214|
|  2|        1955|       341|        357|
| 12|        1955|        40|         40|
|  1|        1956|       202|     

                                                                                

# max Pages per book for an author/title

In [48]:
df_max_page=df_author_explode.groupBy("author", "title").agg(max("number_of_pages").alias("number_of_pages"))

In [47]:
df_max_page.show()

[Stage 11:>                                                         (0 + 1) / 2]

+----------+--------------------+---------------+
|    author|               title|number_of_pages|
+----------+--------------------+---------------+
| OL848617A|Filosofia e polít...|            181|
| OL254163A|A dance to the mu...|            792|
|OL3249688A|Elementary linear...|            524|
|OL5012521A|             Alfalfa|            174|
| OL578182A|              Handel|             24|
| OL112923A|     Rose fairy book|            212|
|OL6321250A|Radiation effects...|            274|
|OL6533480A|Dithola Tša mahla...|             75|
|OL1225706A|Théâtre d'avant-g...|            269|
|OL6537341A|Windows, walls, b...|             69|
|OL6537462A|Comparison of pra...|            111|
|OL4395333A|        Biochemistry|            446|
|OL2896523A|    Devil's doorstep|            192|
|OL1444713A|Census of industr...|            219|
|OL4426843A|          Range camp|            155|
|OL4798258A|       Blown to hell|            186|
|OL1551147A| Havoc in the Indies|            208|




# Postgress Write Check

In [5]:
(df_movies_csv.write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.movies")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())

In [6]:
(df_ratings_csv_fmt
 .select([c for c in df_ratings_csv_fmt.columns if c != "timestamp_epoch"])
 .write
 .format("jdbc")
 .option("url", "jdbc:postgresql://postgres/test")
 .option("dbtable", "public.ratings")
 .option("user", "test")
 .option("password", "postgres")
 .mode("overwrite")
 .save())

                                                                                

In [7]:
df_ratings_csv_fmt.show()

+------+-------+------+---------------+-------------------+
|userId|movieId|rating|timestamp_epoch|          timestamp|
+------+-------+------+---------------+-------------------+
|     1|      1|   4.0|      964982703|2000-07-30 18:45:03|
|     1|      3|   4.0|      964981247|2000-07-30 18:20:47|
|     1|      6|   4.0|      964982224|2000-07-30 18:37:04|
|     1|     47|   5.0|      964983815|2000-07-30 19:03:35|
|     1|     50|   5.0|      964982931|2000-07-30 18:48:51|
|     1|     70|   3.0|      964982400|2000-07-30 18:40:00|
|     1|    101|   5.0|      964980868|2000-07-30 18:14:28|
|     1|    110|   4.0|      964982176|2000-07-30 18:36:16|
|     1|    151|   5.0|      964984041|2000-07-30 19:07:21|
|     1|    157|   5.0|      964984100|2000-07-30 19:08:20|
|     1|    163|   5.0|      964983650|2000-07-30 19:00:50|
|     1|    216|   5.0|      964981208|2000-07-30 18:20:08|
|     1|    223|   3.0|      964980985|2000-07-30 18:16:25|
|     1|    231|   5.0|      964981179|2

DataFrame[userId: string, movieId: string, rating: double, timestamp_epoch: string, timestamp: timestamp]