### Data Wrangling with Spark - My practise

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, asc, desc
from pyspark.sql.functions import sum as Fsum # sum(), min(), max() here are the built-in functions
from pyspark.sql.types import StringType, IntegerType

import datetime
import pandas as pd
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt

In [2]:
# Create SparkSession Object
spark = SparkSession.builder.appName("Wrangling Data with Spark").getOrCreate()

In [3]:
import os,sys
os.environ['PYSPARK_DRIVER_PYTHON'],os.environ['PYSPARK_PYTHON'],sys.executable,sys.version_info

('C:\\Users\\krish\\AppData\\Local\\Programs\\Python\\Python310\\python.exe',
 'C:\\Users\\krish\\AppData\\Local\\Programs\\Python\\Python310\\python.exe',
 'C:\\Users\\krish\\AppData\\Local\\Programs\\Python\\Python310\\python.exe',
 sys.version_info(major=3, minor=10, micro=1, releaselevel='final', serial=0))

In [12]:
# read the json file as pandas data frame, convert this pandas dataframe into a csv file
import pandas as pd
p_df = pd.read_json(r'data/sparkify_log_small.json',lines=True)
p_df.to_csv(r'data/sparkify_log_small.csv', index = None)

In [None]:
# manipulating the os.environment variables i.e. 'PYSPARK_PYTHON', 'PYSPARK_DRIVER_PYTHON'
# import os,sys
# print(os.environ['PYSPARK_PYTHON'],'\n',os.environ['PYSPARK_DRIVER_PYTHON'],'\n',sys.executable)

# os.environ['PYSPARK_PYTHON'] = sys.executable
# os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
# print(os.environ['PYSPARK_PYTHON'],'\n',os.environ['PYSPARK_DRIVER_PYTHON'],'\n',sys.executable)

In [13]:
# Read the json log file into a Spark Data frame
input_path = "data/sparkify_log_small.csv"
user_log_df = spark.read.csv(input_path,sep=',',inferSchema=True, header=True)

#### Data Exploration
- All next cells to slice, dice and explore the above Spark data frame

In [14]:
# See 4 rows of the data frame
# user_log_df.take(4)
user_log_df.show()

+-------------+------+---------+--------+---------+------+------+-----+-------------+--------------------+--------------------+--------------------+---------+---------------+---------------+---------------+--------------------+--------------------+
|           ts|userId|sessionId|    page|     auth|method|status|level|itemInSession|            location|           userAgent|            lastName|firstName|   registration|         gender|         artist|                song|              length|
+-------------+------+---------+--------+---------+------+------+-----+-------------+--------------------+--------------------+--------------------+---------+---------------+---------------+---------------+--------------------+--------------------+
|1513720872284|  1046|     5132|NextSong|Logged In|   PUT|   200| paid|          112|Charlotte-Concord...|"""Mozilla/5.0 (W...| like Gecko) Chro...| Matthews|        Kenneth|1509380319284.0|              M|       Showaddywaddy|Christmas Tears W...|
|151

In [15]:
# Print the schema of the data frame
user_log_df.printSchema()

root
 |-- ts: long (nullable = true)
 |-- userId: integer (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- page: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- method: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- song: string (nullable = true)
 |-- length: string (nullable = true)



In [16]:
# Describe the data frame i.e. to see the statistics of the numerical columns
user_log_df.describe().show()

+-------+--------------------+------------------+------------------+-------+----------+------+------------------+-----+------------------+------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|summary|                  ts|            userId|         sessionId|   page|      auth|method|            status|level|     itemInSession|    location|           userAgent|            lastName|firstName|        registration|              gender|              artist|                song|              length|
+-------+--------------------+------------------+------------------+-------+----------+------+------------------+-----+------------------+------------+--------------------+--------------------+---------+--------------------+--------------------+--------------------+--------------------+--------------------+
|  count|               10000|              9664|             10000|  100

In [10]:
# Let us describe just one column of the data frame
user_log_df.describe("artist").show()

+-------+-----------------+
|summary|           artist|
+-------+-----------------+
|  count|             8347|
|   mean|            461.0|
| stddev|            300.0|
|    min|              !!!|
|    max|ÃÂlafur Arnalds|
+-------+-----------------+



In [9]:
user_log_df.describe('sessionId').show()

+-------+------------------+
|summary|         sessionId|
+-------+------------------+
|  count|             10000|
|   mean|         4436.7511|
| stddev|2043.1281541827561|
|    min|                 9|
|    max|              7144|
+-------+------------------+



In [12]:
## Count the rows in the data frame
user_log_df.count()

10000

In [17]:
## select 'page' column, drop the duplicates in 'page'column and sort them 
user_log_df.select('page').dropDuplicates().sort('page').show()

+----------------+
|            page|
+----------------+
|           About|
|       Downgrade|
|           Error|
|            Help|
|            Home|
|           Login|
|          Logout|
|        NextSong|
|   Save Settings|
|        Settings|
|Submit Downgrade|
|  Submit Upgrade|
|         Upgrade|
+----------------+



In [18]:
## select multiple fields and filter out 1046 userID
user_log_df.select(['userId','firstname','page','song']).where(user_log_df.userId == 1046).show()

+------+---------+--------+------------------+
|userId|firstname|    page|              song|
+------+---------+--------+------------------+
|  1046| Matthews|NextSong|     Showaddywaddy|
|  1046| Matthews|NextSong|     Darius Rucker|
|  1046| Matthews|NextSong|      Public Enemy|
|  1046| Matthews|NextSong|        Jag Panzer|
|  1046| Matthews|NextSong|           Boyzone|
|  1046| Matthews|NextSong|  Hollywood Undead|
|  1046| Matthews|NextSong|   Jimmy Eat World|
|  1046| Matthews|    Home|              null|
|  1046| Matthews|NextSong|             Wilco|
|  1046| Matthews|NextSong|Fountains Of Wayne|
|  1046| Matthews|NextSong|        Miike Snow|
|  1046| Matthews|  Logout|              null|
|  1046| Matthews|    Home|              null|
|  1046| Matthews|NextSong|   Yeah Yeah Yeahs|
|  1046| Matthews|NextSong|       Linkin Park|
|  1046| Matthews|NextSong|          Coldplay|
|  1046| Matthews|NextSong|        Jill Scott|
|  1046| Matthews|NextSong|          Glassjaw|
|  1046| Matt

In [19]:
# You can also use 'filter' function as an alias to 'where'
user_log_df.select(['userId','firstname','page','song']).filter((user_log_df.userId == 1046) & (user_log_df.page == 'NextSong')).show()

+------+---------+--------+------------------+
|userId|firstname|    page|              song|
+------+---------+--------+------------------+
|  1046| Matthews|NextSong|     Showaddywaddy|
|  1046| Matthews|NextSong|     Darius Rucker|
|  1046| Matthews|NextSong|      Public Enemy|
|  1046| Matthews|NextSong|        Jag Panzer|
|  1046| Matthews|NextSong|           Boyzone|
|  1046| Matthews|NextSong|  Hollywood Undead|
|  1046| Matthews|NextSong|   Jimmy Eat World|
|  1046| Matthews|NextSong|             Wilco|
|  1046| Matthews|NextSong|Fountains Of Wayne|
|  1046| Matthews|NextSong|        Miike Snow|
|  1046| Matthews|NextSong|   Yeah Yeah Yeahs|
|  1046| Matthews|NextSong|       Linkin Park|
|  1046| Matthews|NextSong|          Coldplay|
|  1046| Matthews|NextSong|        Jill Scott|
|  1046| Matthews|NextSong|          Glassjaw|
|  1046| Matthews|NextSong|     Michael Cretu|
|  1046| Matthews|NextSong|     Lonnie Gordon|
|  1046| Matthews|NextSong|          Vangelis|
|  1046| Matt

In [20]:
# let us actually start an action instead of just displaying records using show() function above
user_log_df.select(['userId','firstname','page','song']).filter(user_log_df.userId == 1046).collect()

[Row(userId=1046, firstname='Matthews', page='NextSong', song='Showaddywaddy'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Darius Rucker'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Public Enemy'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Jag Panzer'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Boyzone'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Hollywood Undead'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Jimmy Eat World'),
 Row(userId=1046, firstname='Matthews', page='Home', song=None),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Wilco'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Fountains Of Wayne'),
 Row(userId=1046, firstname='Matthews', page='NextSong', song='Miike Snow'),
 Row(userId=1046, firstname='Matthews', page='Logout', song=None),
 Row(userId=1046, firstname='Matthews', page='Home', song=None),
 Row(userId=10

In [14]:
# Let us understand what is the data type of the above output
x = user_log_df.select(['userId','firstname','page','song']).filter(user_log_df.userId == 1046).collect()
type(x)

list

#### Calculating statistics by hour

In [21]:
# Create a user-defined function which gives hour value from the input given to it
get_hour = udf(lambda x : x.datetime.datetime.fromtimestamp(x/1000.0))

In [22]:
# Add a new column called 'hour' to the Spark data frame
# withColumn() of a data frame needs the name of the new column followed by the expression how to compute it
user_log_df = user_log_df.withColumn('hour',get_hour(user_log_df.ts))

In [23]:
user_log_df.printSchema()

root
 |-- ts: long (nullable = true)
 |-- userId: integer (nullable = true)
 |-- sessionId: integer (nullable = true)
 |-- page: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- method: string (nullable = true)
 |-- status: integer (nullable = true)
 |-- level: string (nullable = true)
 |-- itemInSession: integer (nullable = true)
 |-- location: string (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- registration: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- artist: string (nullable = true)
 |-- song: string (nullable = true)
 |-- length: string (nullable = true)
 |-- hour: string (nullable = true)



In [22]:
# user_log_df.take(3)

In [24]:
# see the head of it
songs_in_hour = user_log_df.filter(user_log_df.page == "NextSong").groupby(user_log_df.hour).count().orderBy(user_log_df.hour.cast("float"))

In [25]:
# songs_in_hour.show()
type(songs_in_hour)


pyspark.sql.dataframe.DataFrame

In [26]:
songs_in_hour_pd = songs_in_hour.toPandas()
songs_in_hour_pd['hour'] = pd.to_numeric(songs_in_hour_pd.hour)

Py4JJavaError: An error occurred while calling o113.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 14.0 failed 1 times, most recent failure: Lost task 0.0 in stage 14.0 (TID 12) (10.0.0.152 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:585)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:567)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:91)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	... 22 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:585)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:567)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:91)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:68)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:498)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:830)
Caused by: java.io.EOFException
	at java.base/java.io.DataInputStream.readInt(DataInputStream.java:397)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:76)
	... 22 more
