## Using Spark Cluster with shared data in Docker

### Handle Spark DataFrames

#### Install Java

Visite [https://www.java.com/](https://www.java.com/) to download and install Java.

#### Create a Python Virtual Environment

Select the Python interpreter to use for the notebook. Choose Python Environment from the drop-down menu and Create a Python Virtual Environment.

In [None]:
!pip install pyspark pandas pyarrow

#### Download csv file to local directory

In [None]:
import urllib.request
import zipfile
from os import remove

url = 'https://www.kaggle.com/api/v1/datasets/download/chaitanyahivlekar/large-movie-dataset'
urllib.request.urlretrieve(url,'movies.zip')

with zipfile.ZipFile('movies.zip', 'r') as zip_ref:
    zip_ref.extractall('./')

remove('movies.zip')

#### Connect to Spark and create Spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("MyApp") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()  


#### Reading data
##### Create a spark data frame from csv file

To create a spark data frame from a csv file, we can use the `read.csv` function. Using `inferSchema=True` allow spark infer correct type for each field. However, it can be slow.

In [None]:
sdf = spark.read.csv("movies_dataset.csv", header=True, inferSchema=True)

#### Viewing Data

To see the schema, use `.printSchema()` method.

In [None]:
sdf.printSchema()

Let's show first rows of spark data frame with method `.show()`

In [None]:
sdf.show(truncate=False)

In [None]:
sdf.show(truncate=False)

Display 5 first rows in list

In [None]:
sdf.take(5)

Count total of rows.

In [None]:
sdf.count()

View columns

In [None]:
sdf.columns

Show a summary of spark dataframe

In [None]:
sdf.describe().show()

#### View data with selected columns

In [None]:
sdf.select("Movie_Name").show(truncate=False)

In [None]:
sdf.select("Movie_Name", "Rating").show(truncate=False)

#### Filtering data

To filter data, we can use the `filter` method from spark data frame.


In [None]:
from pyspark.sql.functions import col

sdf.filter( col("Rating") == 5 ).count()

Here, we are using filter conditions `&` and `|` to filter the data.

In [None]:
sdf.filter( ( col("Rating") == 5 ) & col('Genre').contains("Comedy") ).show()

In [None]:
sdf.filter( ( col("Rating") == 5 ) & col('Genre').contains("Comedy") ).count()

In [None]:
sdf.filter( (col('Genre').contains("Action")) | col('Genre').contains("Comedy") ).show()

In [None]:
sdf.filter( (col('Genre').contains("Action")) | col('Genre').contains("Comedy") ).count()

In [None]:
# Visão temporária e gravar na memória
sdf_top = sdf.filter( (col("Rating") == 5) & col('Genre').contains("Comedy") )



In [None]:
sdf_top.show()

#### Transform data

Transforming columns

In [None]:
sdf.withColumn("Rating", sdf["Rating"] / 5 ).show()

In [None]:
sdf.show()

In [None]:
sdf.withColumnRenamed("Movie_Name", "Title") \
    .withColumnRenamed("Rating", "Score") \
    .show()

In [None]:
sdf.show(5)

#### Apply user defined functions

In [None]:
import pandas as pd
from math import log
from pyspark.sql.functions import pandas_udf

@pandas_udf('float')
def pandas_log(series: pd.Series) -> pd.Series:
    return series.apply(log)

sdf.withColumn("log_rating", pandas_log(sdf["Rating"])).show(5)


In [None]:
from pyspark.sql.types import ArrayType, StringType

@pandas_udf(ArrayType(StringType()))
def pandas_split(series: pd.Series) -> pd.Series:
    return series.apply(lambda x: x.split("|") if isinstance(x, str) else None)

sdf.withColumn("gender_list", pandas_split(sdf["Genre"])).show(5)


#### Grouping data

To transform the data, we can use many of the functions available in the Spark DataFrame API. For example, we can use the `groupBy` function to group the data by the `Movie_Name` column and then use the `agg` function to apply the `count` function to each group.


Mean rating by movie

In [None]:
sdf.groupBy("Movie_name").avg("Rating").show()

Total of users ratings by movie

In [None]:
sdf.groupBy("Movie_name").count().show()

In [None]:
sdf.groupBy("Movie_name").sum("Rating").show()

In [None]:
import pyspark.sql.functions as sf
sdf.groupBy("Movie_Name").agg(
    sf.avg("Rating").alias("avg_rating"),
    sf.count("*").alias("total_ratings"),
).show()

In [134]:
import pyspark.sql.functions as sf
sdf.groupBy("Movie_Name").agg(
    sf.avg("Rating").alias("avg_rating"),
    sf.count("*").alias("total_ratings"),
).filter(sf.col("total_ratings") > 1000).show() 



+--------------------+------------------+-------------+
|          Movie_Name|        avg_rating|total_ratings|
+--------------------+------------------+-------------+
|Men in Black (a.k...|3.5817083457378187|        40308|
|       Quills (2000)|3.5385925085130534|         2643|
|O Brother, Where ...|3.8768003783481664|        23259|
|22 Jump Street (2...|3.4989462592202316|         3796|
|   Deadpool 2 (2018)| 3.781831019063309|         4249|
|Snow White and th...|3.5823021181716834|        17940|
|Night of the Livi...| 3.653091817613991|         8005|
|       Psycho (1960)| 4.067213040729703|        22146|
|   Annie Hall (1977)| 4.042453118318979|        16371|
|    Elizabeth (1998)|3.8752777452929483|         8551|
|Problem Child (1990)| 2.097872340425532|         1645|
|Don't Tell Mom th...| 2.725230102869518|         1847|
|When We Were King...| 4.132270693512305|         3576|
|Heavenly Creature...|3.8224384098544233|         7144|
|First Blood (Ramb...|3.5759786219846887|       

                                                                                

In [None]:
from pyspark.sql import functions as sf

sdf_top.groupBy("Movie_name").agg(
    sf.count("*").alias("total_ratings"),
).show()

Here, we filter dataset to only `Genre` that contains `Comedy`, group by `Movie_Name` and `Year`, and summarize the total ratings and average ratings.

In [None]:
sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating")
    ).show()

Add `sum` to summarize the `Ratting` column.

In [None]:
sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating"),
        sf.sum("Rating").alias("sum_rating")
    ). \
   orderBy("total_ratings", ascending=False).show()

#### Ordering dataset



In [None]:
sdf.groupBy("Movie_Name") \
    .agg(
       sf.avg("Rating").alias("avg_rating"),
       sf.count("*").alias("total_ratings"),
    )\
    .filter(sf.col("total_ratings") > 1000)\
    .orderBy("total_ratings", ascending=False) \
    .show()

                                                                                

3790

#### Collect data from Spark to Pandas dataframe

After filter, groupBy, aggregate data, and order, we can collect data from Spark using `.toPandas()` method.

In [None]:
from pyspark.sql import functions as sf

df = sdf.groupBy("Movie_Name") \
    .agg(
       sf.avg("Rating").alias("avg_rating"),
       sf.count("*").alias("total_ratings"),
    )\
    .filter(sf.col("total_ratings") > 1000)\
    .orderBy("total_ratings", ascending=False)\
    .toPandas()

df.to_csv("movies.csv", index=False)

                                                                                

Unnamed: 0,Movie_Name,avg_rating,total_ratings
0,Forrest Gump (1994),4.048011,81491
1,"Shawshank Redemption, The (1994)",4.413576,81482
2,Pulp Fiction (1994),4.188912,79672
3,"Silence of the Lambs, The (1991)",4.151342,74127
4,"Matrix, The (1999)",4.154099,72674
...,...,...,...
3785,Armour of God II: Operation Condor (Operation ...,3.353440,1003
3786,"Last Waltz, The (1978)",3.977545,1002
3787,Interstate 60 (2002),3.930140,1002
3788,Child's Play 3 (1991),2.052947,1001


Collect as list

In [None]:
data_collected = sdf.filter( sf.col('Genre').contains("Comedy") ). \
    groupBy("Movie_name").agg(
        sf.count("*").alias("total_ratings"),
        sf.avg("Rating").alias("avg_rating"),
        sf.sum("Rating").alias("sum_rating")
    ). \
    filter( sf.col("avg_rating") > 4). \
    orderBy("total_ratings", ascending=False). \
    collect()
data_collected

In [None]:
def plus_mean(pandas_df):
    return pandas_df.assign(Rating=pandas_df.Rating - pandas_df.Rating.mean())

result_df = sdf.groupby('User_Id').applyInPandas(plus_mean, schema=sdf.schema)
result_df.show()

In [None]:
spark.stop()