# Spark Data Frames

1.Prepare Movies data: Extracting the Year and Genre from the Text
2.Prepare Users data: Loading a double delimited csv file
3.Prepare Ratings data: Programmatically specifying a schema for the data frame
4.Import Data from URL: Scala
5.Save table without defining DDL in Hive
6.Broadcast Variable example
7.Accumulator example


In [2]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [4]:

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, col
home_dir = "/Users/jhansiboda/Desktop/MovieAnalytics/Movie-Analytics-project/MovieLens Dataset/"
print(home_dir)
# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
df_movies = spark.read.option("header", "False").option("delimiter","::").option("inferSchema","true").csv(home_dir + "movies.csv")
df_ratings = spark.read.option("header", "False").option("delimiter","::").option("inferSchema","true").csv(home_dir + "ratings.csv")
df_users = spark.read.option("header", "False").option("delimiter","::").option("inferSchema","true").csv(home_dir + "users.csv")

print(df_movies)
df_movies.printSchema()
df_movies.dtypes
df_movies.show(10)
df_movies.count()
column_names = ["movie_id","title","genres"]
df_movie = df_movies.toDF(*column_names)

# Create an accumulator for counting
count_accumulator = spark.sparkContext.accumulator(0)


/Users/jhansiboda/Desktop/MovieAnalytics/Movie-Analytics-project/MovieLens Dataset/
DataFrame[_c0: int, _c1: string, _c2: string]
root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)

+---+--------------------+--------------------+
|_c0|                 _c1|                 _c2|
+---+--------------------+--------------------+
|  1|    Toy Story (1995)|Animation|Childre...|
|  2|      Jumanji (1995)|Adventure|Childre...|
|  3|Grumpier Old Men ...|      Comedy|Romance|
|  4|Waiting to Exhale...|        Comedy|Drama|
|  5|Father of the Bri...|              Comedy|
|  6|         Heat (1995)|Action|Crime|Thri...|
|  7|      Sabrina (1995)|      Comedy|Romance|
|  8| Tom and Huck (1995)|Adventure|Children's|
|  9| Sudden Death (1995)|              Action|
| 10|    GoldenEye (1995)|Action|Adventure|...|
+---+--------------------+--------------------+
only showing top 10 rows



# 1.Prepare Movies data: Extracting the Year and Genre from the Text 

In [6]:
df_movie.show()

+--------+--------------------+--------------------+
|movie_id|               title|              genres|
+--------+--------------------+--------------------+
|       1|    Toy Story (1995)|Animation|Childre...|
|       2|      Jumanji (1995)|Adventure|Childre...|
|       3|Grumpier Old Men ...|      Comedy|Romance|
|       4|Waiting to Exhale...|        Comedy|Drama|
|       5|Father of the Bri...|              Comedy|
|       6|         Heat (1995)|Action|Crime|Thri...|
|       7|      Sabrina (1995)|      Comedy|Romance|
|       8| Tom and Huck (1995)|Adventure|Children's|
|       9| Sudden Death (1995)|              Action|
|      10|    GoldenEye (1995)|Action|Adventure|...|
|      11|American Presiden...|Comedy|Drama|Romance|
|      12|Dracula: Dead and...|       Comedy|Horror|
|      13|        Balto (1995)|Animation|Children's|
|      14|        Nixon (1995)|               Drama|
|      15|Cutthroat Island ...|Action|Adventure|...|
|      16|       Casino (1995)|      Drama|Thr

In [8]:
from pyspark.sql.functions import col, min, max, sum, substring

df1 = df_movie.withColumn('year', substring('title', -5, 4).cast('int')).select(col('genres'), col('year'))
df1.printSchema()
df1.show()

root
 |-- genres: string (nullable = true)
 |-- year: integer (nullable = true)

+--------------------+----+
|              genres|year|
+--------------------+----+
|Animation|Childre...|1995|
|Adventure|Childre...|1995|
|      Comedy|Romance|1995|
|        Comedy|Drama|1995|
|              Comedy|1995|
|Action|Crime|Thri...|1995|
|      Comedy|Romance|1995|
|Adventure|Children's|1995|
|              Action|1995|
|Action|Adventure|...|1995|
|Comedy|Drama|Romance|1995|
|       Comedy|Horror|1995|
|Animation|Children's|1995|
|               Drama|1995|
|Action|Adventure|...|1995|
|      Drama|Thriller|1995|
|       Drama|Romance|1995|
|            Thriller|1995|
|              Comedy|1995|
|              Action|1995|
+--------------------+----+
only showing top 20 rows



# 2.Prepare Users data: Loading a double delimited csv file

In [10]:
column_names = ["user_id", "gender", "age", "occupation", "zip_code"]
df_user = df_users.toDF(*column_names)
df_user.show(5)

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
|      4|     M| 45|         7|   02460|
|      5|     M| 25|        20|   55455|
+-------+------+---+----------+--------+
only showing top 5 rows



# 3.Prepare Ratings data: Programmatically specifying a schema for the data frame

In [15]:
from pyspark.sql.types import StructType, StructField, IntegerType, LongType, FloatType
from pyspark.sql.functions import split, col
file_location = home_dir+"ratings.csv"
schema = StructType([
    StructField("user_id", IntegerType(), nullable=False),
    StructField("movie_id", IntegerType(), nullable=False),
    StructField("rating", FloatType(), nullable=False),
    StructField("timestamp", LongType(), nullable=False)
])

df = spark.read \
    .option("sep", "::") \
     .csv(file_location, schema=schema)
df.show()

+-------+--------+------+---------+
|user_id|movie_id|rating|timestamp|
+-------+--------+------+---------+
|      1|    1193|   5.0|978300760|
|      1|     661|   3.0|978302109|
|      1|     914|   3.0|978301968|
|      1|    3408|   4.0|978300275|
|      1|    2355|   5.0|978824291|
|      1|    1197|   3.0|978302268|
|      1|    1287|   5.0|978302039|
|      1|    2804|   5.0|978300719|
|      1|     594|   4.0|978302268|
|      1|     919|   4.0|978301368|
|      1|     595|   5.0|978824268|
|      1|     938|   4.0|978301752|
|      1|    2398|   4.0|978302281|
|      1|    2918|   4.0|978302124|
|      1|    1035|   5.0|978301753|
|      1|    2791|   4.0|978302188|
|      1|    2687|   3.0|978824268|
|      1|    2018|   4.0|978301777|
|      1|    3105|   5.0|978301713|
|      1|    2797|   4.0|978302039|
+-------+--------+------+---------+
only showing top 20 rows



# 4.Import Data from URL: Python

In [16]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [18]:
import pandas as pd
url =  home_dir+"users.csv"
df = pd.read_csv(url, sep="::", engine="python", header=None)
df.head()
  

Unnamed: 0,0,1,2,3,4
0,1,F,1,10,48067
1,2,M,56,16,70072
2,3,M,25,15,55117
3,4,M,45,7,2460
4,5,M,25,20,55455


In [19]:
schema = StructType([
    StructField("user_id", IntegerType(), nullable=True),
    StructField("gender", StringType(), nullable=True),
    StructField("age", IntegerType(), nullable=True),
     StructField("occupation", IntegerType(), nullable=True),
    StructField("zip_code", StringType(), nullable=True)
])
spark_df = spark.createDataFrame(df, schema=schema)

In [20]:
spark_df.show()

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|      1|     F|  1|        10|   48067|
|      2|     M| 56|        16|   70072|
|      3|     M| 25|        15|   55117|
|      4|     M| 45|         7|   02460|
|      5|     M| 25|        20|   55455|
|      6|     F| 50|         9|   55117|
|      7|     M| 35|         1|   06810|
|      8|     M| 25|        12|   11413|
|      9|     M| 25|        17|   61614|
|     10|     F| 35|         1|   95370|
|     11|     F| 25|         1|   04093|
|     12|     M| 25|        12|   32793|
|     13|     M| 45|         1|   93304|
|     14|     M| 35|         0|   60126|
|     15|     M| 25|         7|   22903|
|     16|     F| 35|         0|   20670|
|     17|     M| 50|         1|   95350|
|     18|     F| 18|         3|   95825|
|     19|     M|  1|        10|   48073|
|     20|     M| 25|        14|   55113|
+-------+------+---+----------+--------+
only showing top

In [21]:
spark_df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- zip_code: string (nullable = true)



In [23]:
from pyspark.sql.functions import *
spark_df.withColumn('zip_code', col('zip_code').cast('int'))

spark_df = spark_df.withColumn('zip_code', col('zip_code').cast('int'))

spark_df.printSchema()


root
 |-- user_id: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: integer (nullable = true)
 |-- zip_code: integer (nullable = true)



# 5.Save table without defining DDL in Hive

In [24]:
spark_df.write.mode("overwrite").saveAsTable('name')
spark.sql("select * from name").show()

23/06/21 09:32:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

+-------+------+---+----------+--------+
|user_id|gender|age|occupation|zip_code|
+-------+------+---+----------+--------+
|   3776|     F| 25|         4|   50265|
|   3777|     M| 18|         4|   23112|
|   3778|     M| 25|         0|   61832|
|   3779|     F| 45|         2|   98383|
|   3780|     M|  1|         0|   46979|
|   3781|     F| 56|        11|   54401|
|   3782|     F| 25|         0|   94602|
|   3783|     M| 25|         7|   97267|
|   3784|     F| 25|         1|   98027|
|   3785|     F| 25|        20|   93401|
|   3786|     F| 25|         0|   94115|
|   3787|     F| 25|         0|   94703|
|   3788|     F| 18|         0|   94587|
|   3789|     F| 25|         0|   94110|
|   3790|     F| 25|        17|   94618|
|   3791|     M| 50|        18|   60516|
|   3792|     M| 25|         6|   68108|
|   3793|     M| 18|         4|   21093|
|   3794|     F| 18|         4|   53211|
|   3795|     F| 18|         4|   91405|
+-------+------+---+----------+--------+
only showing top

In [25]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|     name|      false|
+---------+---------+-----------+



# 6.Broadcast Variable example

In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
import time

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()

# Create two sample DataFrames :df_ratings = home_dir+"ratings.csv"
df_ratings = df_ratings

column_rating = ["user_id", "movie_id", "rating", "timestamp"]
df_rating = movies.toDF(*column_rating)

column_names = ["movie_id","title","genres"]
df_movie = df_movie.toDF(*column_names)

# Defining broadcast dataframe
broadcast_movie_df = broadcast(df_movie)

# Capture the start timestamp
start_time_normal_df = time.time()

# Perform full join on the 'movie_id' column
df_full_join = df_rating.join(df_movie, on = "movie_id", how = "full")

# Capture the end timestamp
end_time_normal_df = time.time()

# Capture start time
start_time_broadcast_df = time.time()

#Perform full join on broadcast df
sd_broadcast_full_join  = df_rating.join(broadcast_movie_df, on = "movie_id", how = "full")

# Capture the end timestamp
end_time_broadcast_df = time.time()

# Calculate the time taken

time_taken_normal = end_time_normal_df - start_time_normal_df
time_taken_broadcast = end_time_broadcast_df - start_time_broadcast_df

# Show the result
#df_full_join.show()

# Print the time taken
print(f"Time taken by normal join operation: {time_taken_normal} seconds")

print(f"Time taken by broadcast join operation: {time_taken_broadcast} seconds")

Time taken by normal join operation: 0.018440961837768555 seconds
Time taken by broadcast join operation: 0.009475946426391602 seconds


# 7.Accumulator example


In [28]:
column_names = ["movie_id","title","genres"]
df_movie = df_movie.toDF(*column_names)

# Create an accumulator for counting
count_accumulator = spark.sparkContext.accumulator(0)

# Find number of movies released between year 1920 to 1930

df_movie_with_year = df_movie.withColumn("year", substring('title', -5, 4).cast("int"))

df_movie_with_year.filter(col("year")== 1920).show()
df_movie_with_year.foreach(lambda row: count_accumulator.add(1) if row.year == 1920 else None)

v = count_accumulator.value
print(v)

+--------+--------------------+------+----+
|movie_id|               title|genres|year|
+--------+--------------------+------+----+
|    3231| Saphead, The (1920)|Comedy|1920|
|    3309|Dog's Life, A (1920)|Comedy|1920|
+--------+--------------------+------+----+

2
