## First stage.

Let's install the PySpark package on the local environment. Before using the pip instal pyspark command, you should check for preinstalled Java/OpenJDK packages at least version 11 and Python3.6+. 

In [1]:
pip install pyspark 

Note: you may need to restart the kernel to use updated packages.


Great! Import the sparkSession element from the pyspark.sql library. 

In [2]:
from pyspark.sql import SparkSession

By using the SparkSession command and passing it to the Spark variable, we denote the "entry point" for the local machine.

In [3]:
# use SparkSession

spark = SparkSession.builder.appName("Test").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/19 15:11:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


If a test run detects an error: *22/10/18 20:37:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable*. You will need to use form .setLogLevel(). That will change the logs WARN and start the spark session as much as well. 

In [4]:
# set appName and master

appName = "Spark - Setting Log Level"
master = "local"

# create Spark session

spark = SparkSession.builder \
    .appName(appName) \
    .master(master) \
    .getOrCreate()

# use spark setLogLevel()

spark.sparkContext.setLogLevel("WARN")

22/10/19 15:11:13 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


By redefining the entry point, we eliminate log WARN. But only to use SparkSession. I'll update the project later where I'll look at using the SQL language to communicate with Spark.

Lets testing our spark version. 

In [5]:
spark.version 

'3.3.0'


Let's begin. Let's download the dataframe found at <code>[Kaggle][1] 

[1]: https://www.kaggle.com/datasets/anujsingh098/top-1000-imdb-movies        "Kaggle"
</code>

I really like movies and the movie industry, so we are going to use the IMDb Top 1000 Movies dataframe.

In [6]:
# make SparkSession

spark = SparkSession.builder.getOrCreate()

In [7]:
# run Spark

spark

Let's find our dataframe.

In [8]:
!head -n5 data/imdb_top_1000.csv

Poster_Link,Series_Title,Released_Year,Certificate,Runtime,Genre,IMDB_Rating,Overview,Meta_score,Director,Star1,Star2,Star3,Star4,No_of_Votes,Gross
"https://m.media-amazon.com/images/M/MV5BMDFkYTc0MGEtZmNhMC00ZDIzLWFmNTEtODM1ZmRlYWMwMWFmXkEyXkFqcGdeQXVyMTMxODk2OTU@._V1_UX67_CR0,0,67,98_AL_.jpg",The Shawshank Redemption,1994,A,142 min,Drama,9.3,"Two imprisoned men bond over a number of years, finding solace and eventual redemption through acts of common decency.",80,Frank Darabont,Tim Robbins,Morgan Freeman,Bob Gunton,William Sadler,2343110,28341469
"https://m.media-amazon.com/images/M/MV5BM2MyNjYxNmUtYTAwNi00MTYxLWJmNWYtYzZlODY3ZTk3OTFlXkEyXkFqcGdeQXVyNzkwMjQ5NzM@._V1_UY98_CR1,0,67,98_AL_.jpg",The Godfather,1972,A,175 min,Crime,9.2,An organized crime dynasty's aging patriarch transfers control of his clandestine empire to his reluctant son.,100,Francis Ford Coppola,Marlon Brando,Al Pacino,James Caan,Diane Keaton,1620367,134966411
"https://m.media-amazon.com/images/M/MV5BMTMxNTMwO

Let's read our dataframe data and apply some parameters to it: 
- format() to specify the format of the file used;
- option() clarify parameters, presence of header;
- load() the path to the file to be loaded.

In [9]:
# read dataframe

df = spark.read.format('csv').option('header', 'true').load('data/imdb_top_1000.csv')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [10]:
# open df 5 row

df.show(5)

+--------------------+--------------------+-------------+-----------+-------+------+-----------+--------------------+----------+--------------------+--------------+--------------+-------------+--------------+-----------+---------+
|         Poster_Link|        Series_Title|Released_Year|Certificate|Runtime| Genre|IMDB_Rating|            Overview|Meta_score|            Director|         Star1|         Star2|        Star3|         Star4|No_of_Votes|    Gross|
+--------------------+--------------------+-------------+-----------+-------+------+-----------+--------------------+----------+--------------------+--------------+--------------+-------------+--------------+-----------+---------+
|https://m.media-a...|The Shawshank Red...|         1994|          A|142 min| Drama|        9.3|Two imprisoned me...|        80|      Frank Darabont|   Tim Robbins|Morgan Freeman|   Bob Gunton|William Sadler|    2343110| 28341469|
|https://m.media-a...|       The Godfather|         1972|          A|175 min

In case the table does not fit into the frame of the markup, you can test the display of the dataframe by adding the show method with the parameter vertical.

In [11]:
# open df 1 row
# use vertical params

df.show(1, vertical=True)

-RECORD 0-----------------------------
 Poster_Link   | https://m.media-a... 
 Series_Title  | The Shawshank Red... 
 Released_Year | 1994                 
 Certificate   | A                    
 Runtime       | 142 min              
 Genre         | Drama                
 IMDB_Rating   | 9.3                  
 Overview      | Two imprisoned me... 
 Meta_score    | 80                   
 Director      | Frank Darabont       
 Star1         | Tim Robbins          
 Star2         | Morgan Freeman       
 Star3         | Bob Gunton           
 Star4         | William Sadler       
 No_of_Votes   | 2343110              
 Gross         | 28341469             
only showing top 1 row



Let's take a look slightly at our dataframe using the Spark API functions.

Consider the first function SELECT. Just as in the SQL language, so in the Spark API, the function is used to create selections of certain columns.

In [12]:
# use select()

df.select('Series_Title', 'Released_Year', 'IMDB_Rating', 'Genre')

DataFrame[Series_Title: string, Released_Year: string, IMDB_Rating: string, Genre: string]

The API does not give us the most useful information. In order to see the dataframe, we need to use the show() method.

In [13]:
# use select()
## then use show()

df.select(
    'Series_Title', 'Released_Year', 'IMDB_Rating', 'Genre'
).show(5)

+--------------------+-------------+-----------+------+
|        Series_Title|Released_Year|IMDB_Rating| Genre|
+--------------------+-------------+-----------+------+
|The Shawshank Red...|         1994|        9.3| Drama|
|       The Godfather|         1972|        9.2| Crime|
|     The Dark Knight|         2008|          9|Action|
|The Godfather: Pa...|         1974|          9| Crime|
|        12 Angry Men|         1957|          9| Crime|
+--------------------+-------------+-----------+------+
only showing top 5 rows



Done! 

To further interact with the functions, we import the pyspark.sql.functions package. 

In [14]:
import pyspark.sql.functions as F

Using the functions, the access to the columns is as follows. 

In [15]:
df.select(
    F.col('Series_Title'), F.col('Released_Year'),
    F.col('IMDB_Rating'), F.col('Genre'),
).show(5)

+--------------------+-------------+-----------+------+
|        Series_Title|Released_Year|IMDB_Rating| Genre|
+--------------------+-------------+-----------+------+
|The Shawshank Red...|         1994|        9.3| Drama|
|       The Godfather|         1972|        9.2| Crime|
|     The Dark Knight|         2008|          9|Action|
|The Godfather: Pa...|         1974|          9| Crime|
|        12 Angry Men|         1957|          9| Crime|
+--------------------+-------------+-----------+------+
only showing top 5 rows



This is necessary for the convenience of using additional conditions or logical expressions for columns. 

Let's use the .filter() command to show how it works.

In [16]:
# use .filter()

df\
    .filter('Genre = "Drama"')\
    .select(
        F.col('Series_Title'), F.col('Released_Year'),
        F.col('IMDB_Rating'), F.col('Genre'),
).show(5)

+--------------------+-------------+-----------+-----+
|        Series_Title|Released_Year|IMDB_Rating|Genre|
+--------------------+-------------+-----------+-----+
|The Shawshank Red...|         1994|        9.3|Drama|
|          Fight Club|         1999|        8.8|Drama|
|        Forrest Gump|         1994|        8.8|Drama|
|One Flew Over the...|         1975|        8.7|Drama|
|     Soorarai Pottru|         2020|        8.6|Drama|
+--------------------+-------------+-----------+-----+
only showing top 5 rows



We can use complex expressions in filtering. 

In [17]:
# use harder .filter()

df\
    .filter('Genre = "Drama" and Released_Year = "2019"')\
    .select(
        F.col('Series_Title'), F.col('Released_Year'),
        F.col('IMDB_Rating'), F.col('Genre'),
).show(5)

+--------------------+-------------+-----------+-----+
|        Series_Title|Released_Year|IMDB_Rating|Genre|
+--------------------+-------------+-----------+-----+
|                1917|         2019|        8.3|Drama|
|Miracle in cell NO.7|         2019|        8.3|Drama|
|Portrait de la je...|         2019|        8.1|Drama|
|           Gully Boy|         2019|          8|Drama|
|      Sound of Metal|         2019|        7.8|Drama|
+--------------------+-------------+-----------+-----+
only showing top 5 rows



Is 1917 really a drama? 
Although I am surprised, we go on. 

In order not to get confused by the complex filtering, you can apply the .filter() command twice. The filtering will be performed sequentially. Spark will first filter the source dataset by the first condition, then by the second condition, and then it will address the specified columns. 

In [18]:
# use harder .filter()

df\
    .filter('Genre = "Drama"')\
    .filter('Released_Year = "2019"')\
    .select(
        F.col('Series_Title'), F.col('Released_Year'),
        F.col('IMDB_Rating'), F.col('Genre'),
).show(5)

+--------------------+-------------+-----------+-----+
|        Series_Title|Released_Year|IMDB_Rating|Genre|
+--------------------+-------------+-----------+-----+
|                1917|         2019|        8.3|Drama|
|Miracle in cell NO.7|         2019|        8.3|Drama|
|Portrait de la je...|         2019|        8.1|Drama|
|           Gully Boy|         2019|          8|Drama|
|      Sound of Metal|         2019|        7.8|Drama|
+--------------------+-------------+-----------+-----+
only showing top 5 rows



When F.col() is accessed, we can talk to expressions as Python objects to compare parameters externally.

In [19]:
# fill in the external variables

genre = 'Drama'
year = 2019

# use .filter()

df\
    .filter(F.col('Genre') == genre)\
    .filter(F.col('Released_Year') == year)\
    .select(
        F.col('Series_Title'), F.col('Released_Year'),
        F.col('IMDB_Rating'), F.col('Genre'),
).show(5)

+--------------------+-------------+-----------+-----+
|        Series_Title|Released_Year|IMDB_Rating|Genre|
+--------------------+-------------+-----------+-----+
|                1917|         2019|        8.3|Drama|
|Miracle in cell NO.7|         2019|        8.3|Drama|
|Portrait de la je...|         2019|        8.1|Drama|
|           Gully Boy|         2019|          8|Drama|
|      Sound of Metal|         2019|        7.8|Drama|
+--------------------+-------------+-----------+-----+
only showing top 5 rows



Also, but the process involves the possibility of automation.

Let's calculate the number of rows using the Spark functions.

In [20]:
# use .count()

df.count()

1000

Let's calculate the number of unique rows using the Spark functions. We will use the .distinct function as in SQL.

In [21]:
# use .distinct()
## then .count()

df.select('Genre').distinct().count()

14

14 different genres in dataframe! A big number for the film industry. 

Now let's examine the grouping and aggregation functions. Like count() and distinct(), Spark SQL uses the .groupBy() and .orderBy() operators. We will count the number of movies by genre in the dataframe. 

In [22]:
# use .groupBy()

df.groupBy('Genre').count()

DataFrame[Genre: string, count: bigint]

*DataFrame[Genre: string, count: bigint]* - That's not what we need, right? 
Spark returned us the dataframe data type, so we need to use the .show() operator.

In [23]:
# use .groupBy()
## then .show()

df.groupBy('Genre').count().show()

+---------+-----+
|    Genre|count|
+---------+-----+
|    Crime|  107|
| Thriller|    1|
|Adventure|   72|
|    Drama|  289|
|   Family|    2|
|  Fantasy|    2|
|  Mystery|   12|
|Animation|   82|
|Film-Noir|    3|
|   Horror|   11|
|  Western|    4|
|Biography|   88|
|   Comedy|  155|
|   Action|  172|
+---------+-----+



Let's see which genres are least present in the sample using the .groupBy() and .orderBy() operators together. 

In [24]:
# use .groupBy()
## then .orderBy()

df.groupBy('Genre').count().orderBy('count').show(5)

+---------+-----+
|    Genre|count|
+---------+-----+
| Thriller|    1|
|   Family|    2|
|  Fantasy|    2|
|Film-Noir|    3|
|  Western|    4|
+---------+-----+
only showing top 5 rows



Let's make the top 5 by the number of movies in the genre. Мы будем использовать F.col() совместно с модификатором .desc

In [25]:
# use .groupBy()
## then .orderBy()

df.groupBy('Genre').count().orderBy(F.col('count').desc()).show(5)

+---------+-----+
|    Genre|count|
+---------+-----+
|    Drama|  289|
|   Action|  172|
|   Comedy|  155|
|    Crime|  107|
|Biography|   88|
+---------+-----+
only showing top 5 rows



The Drama. 

If we need to rename a column, we will use the .withColumnRenamed() function.

In [26]:
# use .withColumnRenamed()

df.withColumnRenamed('Poster_Link', 'Poster').select('Poster').show(5)

+--------------------+
|              Poster|
+--------------------+
|https://m.media-a...|
|https://m.media-a...|
|https://m.media-a...|
|https://m.media-a...|
|https://m.media-a...|
+--------------------+
only showing top 5 rows



Simple. 

With the .withColumn() method you can create a new column. First, import the desired data type. Second, let's use the .regex_replace method to exclude the designation **min**.

In [27]:
from pyspark.sql.types import IntegerType, FloatType

# use .withColumn()

df.withColumn(
    'Runtime', F.regexp_replace(F.col('Runtime'), ' min', '')
).select('Series_Title', 'Runtime').show(5)

+--------------------+-------+
|        Series_Title|Runtime|
+--------------------+-------+
|The Shawshank Red...|    142|
|       The Godfather|    175|
|     The Dark Knight|    152|
|The Godfather: Pa...|    202|
|        12 Angry Men|     96|
+--------------------+-------+
only showing top 5 rows



Then use .cast() to change the type. For further communication it is necessary to save our dataframe into a new variable. 

In [28]:
# use .withColumn()
## and .cast()

df_filtered = df.withColumn(
    'Runtime', F.col('Runtime').cast(IntegerType())
).select('*')

We use .cast() for the other lines in our dataframe.

In [29]:
# use .withColumn()
## and .cast()

df_filtered = df\
    .withColumn(
        'Runtime', F.col('Runtime').cast(IntegerType()))\
    .withColumn(
        'IMDB_Rating', F.col('IMDB_Rating').cast(FloatType()))\
    .withColumn(
        'Meta_score', F.col('Meta_score').cast(IntegerType())
).select('*')

Let's see if we were able to create a new column. Let's use the .printSchema function to print a description of the data.

In [30]:
df_filtered.printSchema()

root
 |-- Poster_Link: string (nullable = true)
 |-- Series_Title: string (nullable = true)
 |-- Released_Year: string (nullable = true)
 |-- Certificate: string (nullable = true)
 |-- Runtime: integer (nullable = true)
 |-- Genre: string (nullable = true)
 |-- IMDB_Rating: float (nullable = true)
 |-- Overview: string (nullable = true)
 |-- Meta_score: integer (nullable = true)
 |-- Director: string (nullable = true)
 |-- Star1: string (nullable = true)
 |-- Star2: string (nullable = true)
 |-- Star3: string (nullable = true)
 |-- Star4: string (nullable = true)
 |-- No_of_Votes: string (nullable = true)
 |-- Gross: string (nullable = true)



We can also use the .describe method. It can be used to describe the columns in more detail.

In [31]:
# use .describe

df_filtered.select('Runtime', 'IMDB_Rating', 'Meta_score').describe().show()

+-------+-------+------------------+------------------+
|summary|Runtime|       IMDB_Rating|        Meta_score|
+-------+-------+------------------+------------------+
|  count|      0|              1000|               830|
|   mean|   null| 7.949300034046173| 78.03734939759036|
| stddev|   null|0.2754912482773137|12.363212589581746|
|    min|   null|               7.6|                28|
|    max|   null|               9.3|               100|
+-------+-------+------------------+------------------+



Mean of IMBD_Rating is 7.94. Mean of Meta_score is 78.04. That's a Bingo - said Christoph Waltz in "Inglourious Basterds"! 

Let's re-save our dataframe to csv format.

In [32]:
# use write

df_filtered.write.mode('overwrite')\
    .format('csv').save('results_filtered.csv')

Lets start the basic ETL-pipeline.

## Second stage.

At this stage we will try to make a small ETL-pipeline to understand how Spark and data interact. We will use dataframes, which would not be difficult to analyze with elements of the Pandas library, but we will do it with Spark in order to examine the different methods and functions used by the engine. 

We need to calculate for each year (Released_Year):

- number of movies;
- average IMDB rating;
- minimum IMDB rating;
- maximum IMDB rating.

In [33]:
from pyspark.sql import DataFrame

In [39]:
# extract data func

def extract_data(spark: SparkSession) -> DataFrame:
    path = 'data/imdb_top_1000.csv'
    return spark.read.option("header", "true").csv(path)

# transform data func

def transform_data(df: DataFrame) -> DataFrame:
    output = (
        df
        .groupBy("Released_Year")
        .agg(
            F.count("Released_Year").alias("count_movies"),
            F.round(F.avg(F.col("IMDB_Rating").cast(FloatType()))).alias("avg_imdb_rating"),
            F.min(F.col("IMDB_Rating").cast(FloatType())).alias("min_imdb_rating"),
            F.max(F.col("IMDB_Rating").cast(FloatType())).alias("max_imdb_rating"),
        )
        .orderBy(F.col("count_movies").desc())
    )
    return output

# load data func

def save_data(df: DataFrame) -> None:
    df.write.mode("overwrite").format("csv").option("header", "true").save("output.csv")

# ETL-pipeline func

def main():
    spark = SparkSession.builder.getOrCreate()
    df = extract_data(spark)
    output = transform_data(df)
    save_data(output)
    #spark.stop()

main()

Simply! 

In [42]:
spark.read.option("header", "true")\
    .csv("output.csv/part-00000-ecc2b750-1a83-4f76-943b-924fa7d498e1-c000.csv").show()

+-------------+------------+---------------+---------------+---------------+
|Released_Year|count_movies|avg_imdb_rating|min_imdb_rating|max_imdb_rating|
+-------------+------------+---------------+---------------+---------------+
|         2014|          32|            8.0|            7.6|            8.6|
|         2004|          31|            8.0|            7.6|            8.3|
|         2009|          29|            8.0|            7.6|            8.4|
|         2016|          28|            8.0|            7.6|            8.4|
|         2013|          28|            8.0|            7.6|            8.3|
|         2001|          27|            8.0|            7.6|            8.8|
|         2006|          26|            8.0|            7.6|            8.5|
|         2007|          26|            8.0|            7.6|            8.4|
|         2015|          25|            8.0|            7.6|            8.2|
|         2012|          24|            8.0|            7.6|            8.4|