<a href="https://colab.research.google.com/github/anastasiya178/pyspark_tutorial/blob/main/spark_prep_data.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Preparing Data Using Spark SQL

Guide:

Mastering Big Data Analytics with PySpark
https://learning-oreilly-com.hcpl.idm.oclc.org/videos/mastering-big-data/9781838640583/9781838640583-video3_1/


Spark SQL - Spark module that allows querying structured data inside Spark programms.

#### Loading and exploring data

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName("MyFirstCSVLoad1.csv").getOrCreate()

In [None]:
df = spark.read.csv(
    path="ratings.csv",
    sep=",",
    header=True,
    quote="",
    schema ="userId INT, movieId INT, rating DOUBLE, timestamp INT"
    )

As an alternative to strict schema typing, you can use another setting: inferSchema=True
While it's perfectly fine to use it for exploratory purposes, it is not recommended for production usage.

In [None]:
df.show(5)

In [None]:
df.printSchema()

#### Wrangling data

In [7]:
from pyspark.sql import functions as f

In [None]:
# one operation example
# df = df.withColumnRenamed("timestamp", "timestamp_unix")
# df = df.withColumn("timestamp", f.from_unixtime("timestamp_unix"))

# df = df.withColumn("timestamp", f.to_timestamp("timestamp"))

# multiple operation example

# df = (
#     df
#     .withColumnRenamed("timestamp", "timestamp_unix")
#     .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp_unix")))
# )


df.show(5)
df.printSchema()

#f.from_unixtime
#f.to_timestamp

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
df = (
    spark.read.csv(
    path="ratings.csv",
    sep=",",
    header=True,
    quote="",
    schema ="userId INT, movieId INT, rating DOUBLE, timestamp INT"
    )
    .withColumnRenamed("timestamp", "timestamp_unix")
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp_unix")))
)

df.show(5)
df.printSchema()

+------+-------+------+--------------+-------------------+
|userId|movieId|rating|timestamp_unix|          timestamp|
+------+-------+------+--------------+-------------------+
|     1|      1|   4.0|     964982703|2000-07-30 18:45:03|
|     1|      3|   4.0|     964981247|2000-07-30 18:20:47|
|     1|      6|   4.0|     964982224|2000-07-30 18:37:04|
|     1|     47|   5.0|     964983815|2000-07-30 19:03:35|
|     1|     50|   5.0|     964982931|2000-07-30 18:48:51|
+------+-------+------+--------------+-------------------+
only showing top 5 rows

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp_unix: integer (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
df.drop("timestamp_unix").show(5)

+------+-------+------+-------------------+
|userId|movieId|rating|          timestamp|
+------+-------+------+-------------------+
|     1|      1|   4.0|2000-07-30 18:45:03|
|     1|      3|   4.0|2000-07-30 18:20:47|
|     1|      6|   4.0|2000-07-30 18:37:04|
|     1|     47|   5.0|2000-07-30 19:03:35|
|     1|     50|   5.0|2000-07-30 18:48:51|
+------+-------+------+-------------------+
only showing top 5 rows



*italicised text*### PART 2

Working with movies

In [18]:
ratings = (
    spark.read.csv(
    path="ratings.csv",
    sep=",",
    header=True,
    quote="",
    schema ="userId INT, movieId INT, rating DOUBLE, timestamp INT"
    )
    .withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp")))
)

In [None]:
ratings.show(5)
ratings.printSchema()

In [5]:
movies = spark.read.csv(
    path="movies.csv",
    sep=",",
    header=True,
    quote='"',
    schema = "movieID INT, title STRING, genres STRING"
)


movies.show(5, truncate=False)
movies.printSchema()

+-------+----------------------------------+-------------------------------------------+
|movieID|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
+-------+----------------------------------+-------------------------------------------+
only showing top 5 rows

root
 |-- movieID: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



Filtering

In [None]:
# Option 1
# Where statement takes SQL operation
# movies.where("genres = 'Action'").show()

# Option 2
movies.where(f.col("genres") == "Action").show(5, False)

In [8]:
movie_genre = (
    movies
    .withColumn("genres_array", f.split("genres", "\|")) # regex, so you need to escape "|""
    .withColumn("genre", f.explode("genres_array"))
    .select("movieID", "title", "genre")
)

In [None]:
movie_genre.show()
# movie_genre.printSchema()

+-------+--------------------+---------+
|movieID|               title|    genre|
+-------+--------------------+---------+
|      1|    Toy Story (1995)|Adventure|
|      1|    Toy Story (1995)|Animation|
|      1|    Toy Story (1995)| Children|
|      1|    Toy Story (1995)|   Comedy|
|      1|    Toy Story (1995)|  Fantasy|
|      2|      Jumanji (1995)|Adventure|
|      2|      Jumanji (1995)| Children|
|      2|      Jumanji (1995)|  Fantasy|
|      3|Grumpier Old Men ...|   Comedy|
|      3|Grumpier Old Men ...|  Romance|
|      4|Waiting to Exhale...|   Comedy|
|      4|Waiting to Exhale...|    Drama|
|      4|Waiting to Exhale...|  Romance|
|      5|Father of the Bri...|   Comedy|
|      6|         Heat (1995)|   Action|
|      6|         Heat (1995)|    Crime|
|      6|         Heat (1995)| Thriller|
|      7|      Sabrina (1995)|   Comedy|
|      7|      Sabrina (1995)|  Romance|
|      8| Tom and Huck (1995)|Adventure|
+-------+--------------------+---------+
only showing top

In [None]:
available_genres = movie_genre.select("genre").distinct()

In [None]:
titles_distinct = movie_genre.select("title").distinct()

In [None]:
titles_distinct.show(50)

+--------------------+
|               title|
+--------------------+
|    Fair Game (1995)|
| If Lucy Fell (1996)|
|           "Birdcage|
| Three Wishes (1995)|
|Heavenly Creature...|
|Snow White and th...|
|Night of the Livi...|
|"Streetcar Named ...|
|When We Were King...|
|       Psycho (1960)|
|   Annie Hall (1977)|
|Men in Black (a.k...|
| "Life Less Ordinary|
|In the Heat of th...|
|    Elizabeth (1998)|
|First Blood (Ramb...|
|Problem Child (1990)|
|Gulliver's Travel...|
|Man Bites Dog (C'...|
|One False Move (1...|
|       Quills (2000)|
|Before Night Fall...|
|Don't Tell Mom th...|
|"Abominable Dr. P...|
|   Love Story (1970)|
|Captain Corelli's...|
|     Fat City (1972)|
|              "Jet√©e|
|        Birth (2004)|
|Damn Yankees! (1958)|
|Smiley's People (...|
|Boy Meets Girl (1...|
|My Father the Her...|
|Starship Troopers...|
|Germany Year Zero...|
|  Thumbsucker (2005)|
|Survive Style 5+ ...|
| Just Friends (2005)|
|Mozart and the Wh...|
|    Dead Meat (2004)|
|      "Na

In [9]:
movie_genre.count()

22084

In [None]:
available_genres.count()

20

In [None]:
available_genres.show(200)

+------------------+
|             genre|
+------------------+
|             Crime|
|           Romance|
|          Thriller|
|         Adventure|
|             Drama|
|               War|
|       Documentary|
|           Fantasy|
|           Mystery|
|           Musical|
|         Animation|
|         Film-Noir|
|(no genres listed)|
|              IMAX|
|            Horror|
|           Western|
|            Comedy|
|          Children|
|            Action|
|            Sci-Fi|
+------------------+



As we can see in the output above, there is a value (no genres listed). Let's find out how many movies has no genre specified.

In [None]:
movies_without_genre = movies.where(f.col("genres") == "(no genres listed)")

In [None]:
print(movies_without_genre.count())

34


In [None]:
movies_without_genre.show(20)

# Grouping, Joining, Aggregating

In [None]:
links = spark.read.csv(
  path = "links.csv",
  sep = ",",
  header = True,
  quote = '"',
  schema = "movieId INT, imbdId INT, tmbdId INT"
)


In [10]:
tags = spark.read.csv(
    path = "tags.csv",
    sep = ",",
    header = True,
    quote = '"',
    schema = "userId INT, movieId INT, tag STRING, timestamp INT"
).withColumn("timestamp", f.to_timestamp(f.from_unixtime("timestamp",)))

In [None]:
links.show(15, truncate=False)
links.printSchema()

In [None]:
tags.show(15)
tags.printSchema()

#### Group by genre

Count how many movies are associated with every genre

In [None]:
movies_per_genre = movie_genre.groupBy("genre").count()

In [None]:
movies_per_genre.show()

Count how many genres are associated with every movieId

In [None]:
movies_per_genre = movie_genre.groupBy("movieId").count()

In [None]:
movies_per_genre.show()

#### Join dataframes

Let's look at tags data

In [11]:
tags.show(15)

+------+-------+-----------------+-------------------+
|userId|movieId|              tag|          timestamp|
+------+-------+-----------------+-------------------+
|     2|  60756|            funny|2015-10-24 19:29:54|
|     2|  60756|  Highly quotable|2015-10-24 19:29:56|
|     2|  60756|     will ferrell|2015-10-24 19:29:52|
|     2|  89774|     Boxing story|2015-10-24 19:33:27|
|     2|  89774|              MMA|2015-10-24 19:33:20|
|     2|  89774|        Tom Hardy|2015-10-24 19:33:25|
|     2| 106782|            drugs|2015-10-24 19:30:54|
|     2| 106782|Leonardo DiCaprio|2015-10-24 19:30:51|
|     2| 106782|  Martin Scorsese|2015-10-24 19:30:56|
|     7|  48516|     way too long|2007-01-25 01:08:45|
|    18|    431|        Al Pacino|2016-05-01 21:39:25|
|    18|    431|         gangster|2016-05-01 21:39:09|
|    18|    431|            mafia|2016-05-01 21:39:15|
|    18|   1221|        Al Pacino|2016-04-26 19:35:06|
|    18|   1221|            Mafia|2016-04-26 19:35:03|
+------+--

We want to see a movie title instead of tag.
In order to do that, we can use Join

In [None]:
opinions = movie_genre.join(tags, movies["movieID"] == tags["movieID"])

In [None]:
opinions.show(3)

+-------+----------------+---------+------+-------+-----+-------------------+
|movieID|           title|    genre|userId|movieId|  tag|          timestamp|
+-------+----------------+---------+------+-------+-----+-------------------+
|      1|Toy Story (1995)|Adventure|   567|      1|  fun|2018-05-02 18:33:33|
|      1|Toy Story (1995)|Adventure|   474|      1|pixar|2006-01-14 02:47:05|
|      1|Toy Story (1995)|Adventure|   336|      1|pixar|2006-02-04 09:36:04|
|      1|Toy Story (1995)|Animation|   567|      1|  fun|2018-05-02 18:33:33|
|      1|Toy Story (1995)|Animation|   474|      1|pixar|2006-01-14 02:47:05|
+-------+----------------+---------+------+-------+-----+-------------------+
only showing top 5 rows



If the column name that we join on is the same for both dataframes, to avoid column duplication we could just specify the command the following way:

In [14]:
opinions = (
    movie_genre
    .join(tags, ["movieID"], "inner")
    .select("userID", "movieID", "title", "tag", "timestamp")
)

In [15]:
opinions.show(3)

+------+-------+----------------+-----+-------------------+
|userID|movieID|           title|  tag|          timestamp|
+------+-------+----------------+-----+-------------------+
|   567|      1|Toy Story (1995)|  fun|2018-05-02 18:33:33|
|   474|      1|Toy Story (1995)|pixar|2006-01-14 02:47:05|
|   336|      1|Toy Story (1995)|pixar|2006-02-04 09:36:04|
+------+-------+----------------+-----+-------------------+
only showing top 3 rows



Now we want to include ratings as well

In [20]:
opinions.join(ratings, ["movieID", "userID"]).show(5)

+-------+------+----------------+-----+-------------------+------+-------------------+
|movieID|userID|           title|  tag|          timestamp|rating|          timestamp|
+-------+------+----------------+-----+-------------------+------+-------------------+
|      1|   567|Toy Story (1995)|  fun|2018-05-02 18:33:33|   3.5|2018-05-02 18:33:21|
|      1|   474|Toy Story (1995)|pixar|2006-01-14 02:47:05|   4.0|2001-01-04 02:36:00|
|      1|   336|Toy Story (1995)|pixar|2006-02-04 09:36:04|   4.0|2005-07-24 17:48:49|
|      1|   567|Toy Story (1995)|  fun|2018-05-02 18:33:33|   3.5|2018-05-02 18:33:21|
|      1|   474|Toy Story (1995)|pixar|2006-01-14 02:47:05|   4.0|2001-01-04 02:36:00|
+-------+------+----------------+-----+-------------------+------+-------------------+
only showing top 5 rows



Now, if we want to keep two timestamp columns, it makes sense to rename them prior to apply Join.

In [None]:
opinions_ext = opinions.withColumnRenamed("timestamp", "tag_time").join(
    ratings,
    ["movieID", "userID"]
    ).show(5)

#### Different kinds of joins

In [None]:
opinions = movie_genre.join(tags, ["movieID"])

In [None]:
opinions.show(3)

+-------+----------------+---------+------+-----+-------------------+
|movieID|           title|    genre|userId|  tag|          timestamp|
+-------+----------------+---------+------+-----+-------------------+
|      1|Toy Story (1995)|Adventure|   567|  fun|2018-05-02 18:33:33|
|      1|Toy Story (1995)|Adventure|   474|pixar|2006-01-14 02:47:05|
|      1|Toy Story (1995)|Adventure|   336|pixar|2006-02-04 09:36:04|
+-------+----------------+---------+------+-----+-------------------+
only showing top 3 rows



In [None]:
opinions = movie_genre.join(tags, ["movieID"], "inner")

In [None]:
opinions.show(3)

+-------+----------------+---------+------+-----+-------------------+
|movieID|           title|    genre|userId|  tag|          timestamp|
+-------+----------------+---------+------+-----+-------------------+
|      1|Toy Story (1995)|Adventure|   567|  fun|2018-05-02 18:33:33|
|      1|Toy Story (1995)|Adventure|   474|pixar|2006-01-14 02:47:05|
|      1|Toy Story (1995)|Adventure|   336|pixar|2006-02-04 09:36:04|
+-------+----------------+---------+------+-----+-------------------+
only showing top 3 rows



In [None]:
opinions = movie_genre.join(tags, ["movieID"], "outer")

In [None]:
opinions.show(3)

+-------+----------------+---------+------+-----+-------------------+
|movieID|           title|    genre|userId|  tag|          timestamp|
+-------+----------------+---------+------+-----+-------------------+
|      1|Toy Story (1995)|Adventure|   336|pixar|2006-02-04 09:36:04|
|      1|Toy Story (1995)|Adventure|   474|pixar|2006-01-14 02:47:05|
|      1|Toy Story (1995)|Adventure|   567|  fun|2018-05-02 18:33:33|
+-------+----------------+---------+------+-----+-------------------+
only showing top 3 rows



In [None]:
opinions = movie_genre.join(tags, ["movieID"], "left")

In [None]:
opinions.show(3)

+-------+----------------+---------+------+-----+-------------------+
|movieID|           title|    genre|userId|  tag|          timestamp|
+-------+----------------+---------+------+-----+-------------------+
|      1|Toy Story (1995)|Adventure|   567|  fun|2018-05-02 18:33:33|
|      1|Toy Story (1995)|Adventure|   474|pixar|2006-01-14 02:47:05|
|      1|Toy Story (1995)|Adventure|   336|pixar|2006-02-04 09:36:04|
+-------+----------------+---------+------+-----+-------------------+
only showing top 3 rows



In [None]:
opinions = movie_genre.join(tags, ["movieID"], "right")

In [None]:
opinions.show(3)

+-------+--------------------+------+------+---------------+-------------------+
|movieId|               title| genre|userId|            tag|          timestamp|
+-------+--------------------+------+------+---------------+-------------------+
|  60756|Step Brothers (2008)|Comedy|     2|          funny|2015-10-24 19:29:54|
|  60756|Step Brothers (2008)|Comedy|     2|Highly quotable|2015-10-24 19:29:56|
|  60756|Step Brothers (2008)|Comedy|     2|   will ferrell|2015-10-24 19:29:52|
+-------+--------------------+------+------+---------------+-------------------+
only showing top 3 rows

