In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, min, max, col, desc, split

spark = SparkSession.builder.appName('Read CSV File into DataFrame').getOrCreate()

# Part 1

## How Many Google items were purchased in November 2020?

In [2]:
events = spark.read.csv('events1.csv', sep=',', inferSchema=True, header=True)
events.show()

+-------+-------------+-------+-------+-----------+-------+-------------------+
|user_id|ga_session_id|country| device|       type|item_id|               date|
+-------+-------------+-------+-------+-----------+-------+-------------------+
|   2133|        16909|     US| mobile|   purchase|     94|2020-11-01 00:27:14|
|   2133|        16909|     US| mobile|   purchase|    425|2020-11-01 00:27:14|
|   5789|        16908|     SE|desktop|   purchase|      1|2020-11-01 01:44:44|
|   5789|        16908|     SE|desktop|   purchase|     62|2020-11-01 01:44:44|
|   5808|         4267|     US| mobile|add_to_cart|    842|2020-11-01 03:06:29|
|   5808|         4267|     US| mobile|add_to_cart|    951|2020-11-01 03:06:29|
|   5808|         4267|     US| mobile|add_to_cart|    950|2020-11-01 03:06:29|
|   5808|         4267|     US| mobile|add_to_cart|   1068|2020-11-01 03:06:29|
|   5808|         4267|     US| mobile|add_to_cart|    862|2020-11-01 03:06:29|
|   5808|         4267|     US| mobile|a

In [3]:
purchase_events = events.filter((events.type == "purchase") & (month(events.date) == 11) & (year(events.date) == 2020))
purchase_events.count()

6190

##  How many mobile devices were purchased in December?

In [4]:
mobile_purchase_events = events.filter((events.type == "purchase") & (month(events.date) == 12) & (events.device == "mobile"))
mobile_purchase_events.count()

2625

## Sort the countries based on their sold items in descending order?

In [5]:
all_purchase_event = events.filter((events.type == "purchase"))
purchase_countries = all_purchase_event.select("country")
country_purchase_count = purchase_countries.groupBy("country").count().sort(desc("count"))
country_purchase_count.show()

+-------+-----+
|country|count|
+-------+-----+
|     US| 6849|
|     IN| 1462|
|     CA| 1361|
|     GB|  457|
|     ES|  384|
|     FR|  344|
|     CN|  279|
|     DE|  250|
|     TR|  239|
|     TW|  231|
|     JP|  229|
|     BR|  191|
|     SG|  178|
|     NL|  172|
|     IT|  159|
|     AU|  148|
|     KR|  143|
|     MY|  120|
|     MX|  116|
|     PL|  111|
+-------+-----+
only showing top 20 rows



## What are the price range for US devices being purchased? 

In [6]:
us_purchase_events = events.filter((events.type == "purchase") & (events.country == "US"))
items = spark.read.csv('items.csv', sep=',', inferSchema=True, header=True)
result = us_purchase_events.join(items, us_purchase_events.item_id == items.id, "inner")
purchase_prices = result.select("country","price_in_usd")
us_price_range = purchase_prices.select(min(purchase_prices.price_in_usd).alias("min_price"), max(purchase_prices.price_in_usd).alias("max_price"))
us_price_range.show()

+---------+---------+
|min_price|max_price|
+---------+---------+
|        1|      120|
+---------+---------+



## Which country has the highest income from sold devices?

In [7]:
result = all_purchase_event.join(items, all_purchase_event.item_id == items.id, "inner")
purchase_prices = result.select("country","price_in_usd")
highest_income_country = purchase_prices.groupBy("country").sum()
highest_income_country = highest_income_country.select("country",col("sum(price_in_usd)").alias("country_total_sales"))
highest_income_country = highest_income_country.sort(desc("country_total_sales"))
highest_income_country.limit(1).show()

+-------+-------------------+
|country|country_total_sales|
+-------+-------------------+
|     US|             137409|
+-------+-------------------+



# Part 2

## Group the movies based on genre and show the list of movies that their rating are higher than the average per category?

In [8]:
movie = spark.read.csv('movie.csv', sep=',', inferSchema=True, header=True)
rating = spark.read.csv('rating.csv', sep=',', inferSchema=True, header=True)

In [9]:
master_frame = movie.join(rating, ["movieId"], "inner")
#master_frame = master_frame.groupBy("country").sum()
master_frame_genre = master_frame.withColumn("genre", split(master_frame["genres"], "\|").getItem(0))\
                                .withColumn("genre_1", split(master_frame["genres"], "\|").getItem(1))\
                                .withColumn("genre_2", split(master_frame["genres"], "\|").getItem(2))\
                                .withColumn("genre_3", split(master_frame["genres"], "\|").getItem(3))\
                                .withColumn("genre_4", split(master_frame["genres"], "\|").getItem(4))\
                                .withColumn("genre_5", split(master_frame["genres"], "\|").getItem(5))\
                                .withColumn("genre_6", split(master_frame["genres"], "\|").getItem(6))\
                                .withColumn("genre_7", split(master_frame["genres"], "\|").getItem(7))\
                                .withColumn("genre_8", split(master_frame["genres"], "\|").getItem(8))
master_frame_genre.show()

+-------+--------------------+--------------------+------+------+-------------------+---------+---------+--------+--------+-------+-------+-------+-------+-------+
|movieId|               title|              genres|userId|rating|          timestamp|    genre|  genre_1| genre_2| genre_3|genre_4|genre_5|genre_6|genre_7|genre_8|
+-------+--------------------+--------------------+------+------+-------------------+---------+---------+--------+--------+-------+-------+-------+-------+-------+
|      2|      Jumanji (1995)|Adventure|Childre...|     1|   3.5|2005-04-02 23:53:47|Adventure| Children| Fantasy|    null|   null|   null|   null|   null|   null|
|     29|City of Lost Chil...|Adventure|Drama|F...|     1|   3.5|2005-04-02 23:31:16|Adventure|    Drama| Fantasy| Mystery| Sci-Fi|   null|   null|   null|   null|
|     32|Twelve Monkeys (a...|Mystery|Sci-Fi|Th...|     1|   3.5|2005-04-02 23:33:39|  Mystery|   Sci-Fi|Thriller|    null|   null|   null|   null|   null|   null|
|     47|Seven (

In [10]:
Movie_Genre = master_frame_genre.select("movieId","title","userId","rating","timestamp","genre")
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_1").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_2").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_3").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_4").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_5").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_6").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_7").alias("genre")))
Movie_Genre = Movie_Genre.union(master_frame_genre.select("movieId","title","userId","rating","timestamp",col("genre_8").alias("genre")))
Movie_Genre = Movie_Genre.filter(col("genre").isNotNull())
Movie_Genre.sort(desc("movieId")).show()

+-------+--------------------+------+------+-------------------+------------------+
|movieId|               title|userId|rating|          timestamp|             genre|
+-------+--------------------+------+------+-------------------+------------------+
| 131262|    Innocence (2014)|133047|   4.0|2015-03-30 20:39:26|            Horror|
| 131262|    Innocence (2014)|133047|   4.0|2015-03-30 20:39:26|         Adventure|
| 131262|    Innocence (2014)|133047|   4.0|2015-03-30 20:39:26|           Fantasy|
| 131260| Rentun Ruusu (2001)| 65409|   3.0|2015-03-30 19:57:46|(no genres listed)|
| 131258|  The Pirates (2014)| 28906|   2.5|2015-03-30 19:56:32|         Adventure|
| 131256|Feuer, Eis & Dose...| 79570|   4.0|2015-03-30 19:48:08|            Comedy|
| 131254|Kein Bund für's L...| 79570|   4.0|2015-03-30 19:32:59|            Comedy|
| 131252|Forklift Driver K...| 79570|   4.0|2015-03-30 19:20:55|            Horror|
| 131252|Forklift Driver K...| 79570|   4.0|2015-03-30 19:20:55|            

In [11]:
Genre_Rating = Movie_Genre.select("genre","rating")
Avg_Genre_Rating = Genre_Rating.groupBy("genre").avg()
Avg_Genre_Rating = Avg_Genre_Rating.select("genre",col("avg(rating)").alias("avg_rating"))
Avg_Genre_Rating.show()

+------------------+------------------+
|             genre|        avg_rating|
+------------------+------------------+
|             Crime|3.6745276025631113|
|           Romance| 3.541802581902903|
|          Thriller|  3.50711121809216|
|         Adventure|3.5018926565473865|
|             Drama|3.6743003030009844|
|               War|3.8095307347384844|
|       Documentary|3.7397176834178865|
|           Fantasy|3.5059453358738244|
|           Mystery| 3.663508921312903|
|           Musical|3.5582347604391567|
|         Animation| 3.617498487465694|
|         Film-Noir|  3.96538126070082|
|(no genres listed)|3.0069252077562325|
|              IMAX| 3.655945983272606|
|            Horror|3.2772238097518307|
|           Western| 3.570710170932099|
|            Comedy|3.4260113054324886|
|          Children|3.4081137685270444|
|            Action|3.4438645064783917|
|            Sci-Fi|3.4367739795278616|
+------------------+------------------+



In [12]:
Avg_Genre_Rating.createTempView("Avg_Genre_Rating")
Movie_Genre.createTempView("Movie_Genre")

In [13]:
Movie_Genre_Better_AVG = spark.sql(""" SELECT * FROM Avg_Genre_Rating INNER JOIN Movie_Genre ON Avg_Genre_Rating.genre = Movie_Genre.genre 
Where rating>=avg_rating""")

In [14]:
user = ""  #your database username
password = "" #your database password
Movie_Genre_Better_AVG.write.format("jdbc") \
  .mode("overwrite") \
  .option("url", jdbcUrl) \
  .option("dbtable", "Movie.Movie_Genre") \
  .option("user", user) \
  .option("password", password) \
  .save()