In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [None]:
!tar xf spark-3.3.1-bin-hadoop3.tgz
!pip install -q findspark
!pip install pyspark

In [None]:
from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark import SparkFiles


spark = SparkSession \
       .builder \
       .appName("Our First Spark example") \
       .getOrCreate()

spark

In [None]:
spark=SparkSession.builder.getOrCreate()
## ensure to use the link of the raw file of the csv
url = "https://raw.githubusercontent.com/Munasib14/SparkMl-and-SparkSQL/master/SparkMl/HeartStroke.csv" # Make sure the url is the raw version of the file on GitHub

In [None]:
spark.sparkContext.addFile(url)

In [1]:
# import findspark

# findspark.init("/opt/cloudera/parcels/CDH-6.2.1-1.cdh6.2.1.p0.1425774/lib/spark")

import pyspark

sc= pyspark.SparkContext()

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.format("csv") \
                         .option("header", "true") \
                         .option("inferSchema", "true") \
                         .load("Cars_Sale.csv")


In [3]:
df.printSchema()

root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle_type: string (nullable = true)
 |-- Latest_Launch: string (nullable = true)
 |-- Units_Price: double (nullable = true)
 |-- Units_Sold : double (nullable = true)
 |-- Cost_incurred: double (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Cost: double (nullable = true)
 |-- Profit: double (nullable = true)



In [4]:
print(df)

DataFrame[Manufacturer: string, Model: string, Vehicle_type: string, Latest_Launch: string, Units_Price: double, Units_Sold : double, Cost_incurred: double, Revenue: double, Cost: double, Profit: double]


In [5]:
df.show()

+------------+-----------+------------+-------------+-----------+-----------+-------------+-----------+-----------+-----------+
|Manufacturer|      Model|Vehicle_type|Latest_Launch|Units_Price|Units_Sold |Cost_incurred|    Revenue|       Cost|     Profit|
+------------+-----------+------------+-------------+-----------+-----------+-------------+-----------+-----------+-----------+
|       Acura|    Integra|   Passenger|   02-02-2012|       21.5|     16.919|        16.36|   363.7585|  276.79484|   86.96366|
|       Acura|         TL|   Passenger|   06-03-2011|       28.4|     39.384|       19.875|  1118.5056|    782.757|   335.7486|
|       Acura|         CL|   Passenger|   01-04-2012|      35.44|     14.114|       18.225|  500.20016|  257.22765|  242.97251|
|       Acura|         RL|   Passenger|   03-10-2011|       42.0|      8.588|       29.725|    360.696|   255.2783|   105.4177|
|        Audi|         A4|   Passenger|   10-08-2011|      23.99|     20.397|       22.255|  489.32403| 

In [6]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, FloatType, LongType, IntegerType, DateType

# define the structure
schema = StructType([
    StructField("Manufacturer", StringType(),True),
    StructField("Model", StringType(),True),
    StructField("Vehicle_type", StringType(),True),
    StructField("Latest_Launch", StringType(),True),
    StructField("Units_Sold", DoubleType(),True),
    StructField("Units_Price", DoubleType(),True),
    StructField("Cost_incurred", DoubleType(),True),
    StructField("Revenue", DoubleType(),True),
    StructField("Cost", DoubleType(),True),
    StructField("Profit", DoubleType(),True)
])



# read the file by using the defined schema
df1 = spark.read.format("csv").option("header", "true").schema(schema).load("Cars_Sale.csv")

# display the schema
df1.printSchema()


root
 |-- Manufacturer: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- Vehicle_type: string (nullable = true)
 |-- Latest_Launch: string (nullable = true)
 |-- Units_Sold: double (nullable = true)
 |-- Units_Price: double (nullable = true)
 |-- Cost_incurred: double (nullable = true)
 |-- Revenue: double (nullable = true)
 |-- Cost: double (nullable = true)
 |-- Profit: double (nullable = true)



In [7]:
df1.show()

+------------+-----------+------------+-------------+----------+-----------+-------------+-----------+-----------+-----------+
|Manufacturer|      Model|Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|    Revenue|       Cost|     Profit|
+------------+-----------+------------+-------------+----------+-----------+-------------+-----------+-----------+-----------+
|       Acura|    Integra|   Passenger|   02-02-2012|      21.5|     16.919|        16.36|   363.7585|  276.79484|   86.96366|
|       Acura|         TL|   Passenger|   06-03-2011|      28.4|     39.384|       19.875|  1118.5056|    782.757|   335.7486|
|       Acura|         CL|   Passenger|   01-04-2012|     35.44|     14.114|       18.225|  500.20016|  257.22765|  242.97251|
|       Acura|         RL|   Passenger|   03-10-2011|      42.0|      8.588|       29.725|    360.696|   255.2783|   105.4177|
|        Audi|         A4|   Passenger|   10-08-2011|     23.99|     20.397|       22.255|  489.32403| 453.9352

In [8]:
from pyspark.sql.functions import col, column
import pyspark.sql.functions as F

# select a few columns
df1.select("Manufacturer", "Vehicle_type", F.col("Model"), "Latest_Launch", "Units_Sold", "Revenue", F.lit('DefaultValue')).show(4, False)

+------------+------------+-------+-------------+----------+---------+------------+
|Manufacturer|Vehicle_type|Model  |Latest_Launch|Units_Sold|Revenue  |DefaultValue|
+------------+------------+-------+-------------+----------+---------+------------+
|Acura       |Passenger   |Integra|02-02-2012   |21.5      |363.7585 |DefaultValue|
|Acura       |Passenger   |TL     |06-03-2011   |28.4      |1118.5056|DefaultValue|
|Acura       |Passenger   |CL     |01-04-2012   |35.44     |500.20016|DefaultValue|
|Acura       |Passenger   |RL     |03-10-2011   |42.0      |360.696  |DefaultValue|
+------------+------------+-------+-------------+----------+---------+------------+
only showing top 4 rows



In [9]:
df1.select("*").show(4, False)

+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
|Manufacturer|Model  |Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|Revenue  |Cost     |Profit   |
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
|Acura       |Integra|Passenger   |02-02-2012   |21.5      |16.919     |16.36        |363.7585 |276.79484|86.96366 |
|Acura       |TL     |Passenger   |06-03-2011   |28.4      |39.384     |19.875       |1118.5056|782.757  |335.7486 |
|Acura       |CL     |Passenger   |01-04-2012   |35.44     |14.114     |18.225       |500.20016|257.22765|242.97251|
|Acura       |RL     |Passenger   |03-10-2011   |42.0      |8.588      |29.725       |360.696  |255.2783 |105.4177 |
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
only showing top 4 rows



In [10]:
df1.columns

['Manufacturer',
 'Model',
 'Vehicle_type',
 'Latest_Launch',
 'Units_Sold',
 'Units_Price',
 'Cost_incurred',
 'Revenue',
 'Cost',
 'Profit']

In [11]:
df2 = df1.withColumnRenamed("Vehicle_type", "VehicleType").withColumnRenamed('Units_Sold','units_sold').withColumnRenamed('Units_Price','units_price').withColumnRenamed('Cost_incurred','units_cost')

In [12]:
df2.show(3, False)

+------------+-------+-----------+-------------+----------+-----------+----------+---------+---------+---------+
|Manufacturer|Model  |VehicleType|Latest_Launch|units_sold|units_price|units_cost|Revenue  |Cost     |Profit   |
+------------+-------+-----------+-------------+----------+-----------+----------+---------+---------+---------+
|Acura       |Integra|Passenger  |02-02-2012   |21.5      |16.919     |16.36     |363.7585 |276.79484|86.96366 |
|Acura       |TL     |Passenger  |06-03-2011   |28.4      |39.384     |19.875    |1118.5056|782.757  |335.7486 |
|Acura       |CL     |Passenger  |01-04-2012   |35.44     |14.114     |18.225    |500.20016|257.22765|242.97251|
+------------+-------+-----------+-------------+----------+-----------+----------+---------+---------+---------+
only showing top 3 rows



In [13]:
df1.show(4, False)

+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
|Manufacturer|Model  |Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|Revenue  |Cost     |Profit   |
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
|Acura       |Integra|Passenger   |02-02-2012   |21.5      |16.919     |16.36        |363.7585 |276.79484|86.96366 |
|Acura       |TL     |Passenger   |06-03-2011   |28.4      |39.384     |19.875       |1118.5056|782.757  |335.7486 |
|Acura       |CL     |Passenger   |01-04-2012   |35.44     |14.114     |18.225       |500.20016|257.22765|242.97251|
|Acura       |RL     |Passenger   |03-10-2011   |42.0      |8.588      |29.725       |360.696  |255.2783 |105.4177 |
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+
only showing top 4 rows



In [14]:
# adding columns to a dataframe
import pyspark.sql.functions as F

# add a new column "Register_Site" with default value "www.google.com"
dataDF = df1.withColumn("Register_Site", F.lit("www.google.com"))

# display only a few columns
dataDF.select("Manufacturer", "Vehicle_type","Model", "Register_Site").show(3, False)

+------------+------------+-------+--------------+
|Manufacturer|Vehicle_type|Model  |Register_Site |
+------------+------------+-------+--------------+
|Acura       |Passenger   |Integra|www.google.com|
|Acura       |Passenger   |TL     |www.google.com|
|Acura       |Passenger   |CL     |www.google.com|
+------------+------------+-------+--------------+
only showing top 3 rows



In [15]:
# removing columns from a DataFrame

# number of columns in a dataframe - before removing columns
print("Number of columns : ", len(dataDF.columns))

# columns - before dropping
print(list(dataDF.columns))

# drop columns - "Vehicle_type", "Model"
datanewDF = dataDF.drop("Vehicle_type", "Model")

# number of columns in a dataframe - after removing columns
print("Number of columns : ", len(datanewDF.columns))

# columns - after dropping
print(list(datanewDF.columns))

Number of columns :  11
['Manufacturer', 'Model', 'Vehicle_type', 'Latest_Launch', 'Units_Sold', 'Units_Price', 'Cost_incurred', 'Revenue', 'Cost', 'Profit', 'Register_Site']
Number of columns :  9
['Manufacturer', 'Latest_Launch', 'Units_Sold', 'Units_Price', 'Cost_incurred', 'Revenue', 'Cost', 'Profit', 'Register_Site']


In [16]:
# arithmetic with dataframes
# number of columns in a dataframe - before a adding a column
print("Number of columns : ", len(df1.columns))

# perform arithmetic operations on a dataframe column
newDF = df1.withColumn("TotalSale", col("Units_Sold") * col("Units_Price"))

# number of columns in a dataframe - after adding columns
print("Number of columns : ", len(newDF.columns))

# display records
newDF.show(3)

Number of columns :  10
Number of columns :  11
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+---------+
|Manufacturer|  Model|Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|  Revenue|     Cost|   Profit|TotalSale|
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+---------+
|       Acura|Integra|   Passenger|   02-02-2012|      21.5|     16.919|        16.36| 363.7585|276.79484| 86.96366| 363.7585|
|       Acura|     TL|   Passenger|   06-03-2011|      28.4|     39.384|       19.875|1118.5056|  782.757| 335.7486|1118.5056|
|       Acura|     CL|   Passenger|   01-04-2012|     35.44|     14.114|       18.225|500.20016|257.22765|242.97251|500.20016|
+------------+-------+------------+-------------+----------+-----------+-------------+---------+---------+---------+---------+
only showing top 3 rows



In [17]:
# filter a dataframe

df1.where(col("Manufacturer") == "Cadillac").show(5)

+------------+--------+------------+-------------+----------+-----------+-------------+-----------+-----------+----------+
|Manufacturer|   Model|Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|    Revenue|       Cost|    Profit|
+------------+--------+------------+-------------+----------+-----------+-------------+-----------+-----------+----------+
|    Cadillac| DeVille|   Passenger|    2/23/2012|    39.895|     63.729|       22.525|2542.468455|1435.495725|1106.97273|
|    Cadillac| Seville|   Passenger|    4/29/2011|    44.475|     15.943|         27.1| 709.064925|   432.0553|277.009625|
|    Cadillac|Eldorado|   Passenger|   11/27/2011|    39.665|      6.536|       25.725|  259.25044|   168.1386|  91.11184|
|    Cadillac|  Catera|   Passenger|    9/28/2011|     31.01|     11.185|       18.225|  346.84685| 203.846625|143.000225|
|    Cadillac|Escalade|         Car|    4/17/2012|    46.225|     14.785|         23.0| 683.436625|    340.055|343.381625|
+------------+--

In [18]:
# filter a dataframe - multiple columns

df1.where((col("Manufacturer") == "Cadillac") & (col("Vehicle_type") == "Passenger")).show(5)

+------------+--------+------------+-------------+----------+-----------+-------------+-----------+-----------+----------+
|Manufacturer|   Model|Vehicle_type|Latest_Launch|Units_Sold|Units_Price|Cost_incurred|    Revenue|       Cost|    Profit|
+------------+--------+------------+-------------+----------+-----------+-------------+-----------+-----------+----------+
|    Cadillac| DeVille|   Passenger|    2/23/2012|    39.895|     63.729|       22.525|2542.468455|1435.495725|1106.97273|
|    Cadillac| Seville|   Passenger|    4/29/2011|    44.475|     15.943|         27.1| 709.064925|   432.0553|277.009625|
|    Cadillac|Eldorado|   Passenger|   11/27/2011|    39.665|      6.536|       25.725|  259.25044|   168.1386|  91.11184|
|    Cadillac|  Catera|   Passenger|    9/28/2011|     31.01|     11.185|       18.225|  346.84685| 203.846625|143.000225|
+------------+--------+------------+-------------+----------+-----------+-------------+-----------+-----------+----------+



In [19]:
# dropping rows
testDF = [[1, "January"], [2, "February"], [1, "January"], [3, "March"], [3, "March"], [3, "March"], [4, "April"], [4, "April"], [5, "May"], [5, "May"],
          [4, "April"], [6, "June"], [5, "April"]]

# import the modules
from pyspark.sql.types import *

# define the schema
schema = StructType([StructField("ID", IntegerType()),StructField("Month", StringType())])

# create the dataframe by applying schema
df_new = spark.createDataFrame(testDF,schema=schema) 

# display the records
df_new.show()

Py4JJavaError: An error occurred while calling o106.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 1 times, most recent failure: Lost task 0.0 in stage 12.0 (TID 12) (Munasib14 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2249)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2268)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:189)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:164)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:176)
	... 29 more


In [None]:
# display distinct rows
df_new.distinct().show()

In [None]:
# drop duplicate records based a column value
df_new.dropDuplicates(['Month']).show()

# drop duplicate records based multiple column values
df_new.dropDuplicates(['Month', 'ID']).show()

In [None]:
# rename existing columns
newDF1 = df1.withColumnRenamed("Units_Price", "UnitPrice").withColumnRenamed("Profit", "Total_Profit")

df1.show(3) # display records

from pyspark.sql.functions import expr # define the modules

# using select expression 
newDF1.select("Manufacturer", "Model",expr("CASE WHEN Total_Profit > 104 THEN  'Good' ELSE 'Average' END AS value_desc")).show(3)

In [None]:
from pyspark.sql.types import *   # import the libraries

# define a list
list_data = [["Bill Gates",23],["Henry Ford", None], ["Tim Cook", None]]

# define the schema
schema = StructType([StructField("Name", StringType()),StructField("Experience", IntegerType())])

# create a dataframe 
df_new = spark.createDataFrame(list_data,schema=schema)

df_new.show() # display the dataframe

In [None]:
# drop null value rows
df_new.na.drop().show()

In [None]:
# fill null value with a constant value
df_new.fillna(34).show()

In [None]:
# replace a single value
df_new.na.replace('Bill Gates', 'Satya Nadella').show()

In [None]:
# replace multiple values and also fill 'null' with a constant value
df_new.na.replace(['Bill Gates', 'Tim Cook'], ['Satya N', 'Time'], 'Name').fillna(40).show()

In [None]:
# rename the existing columns - "Profit" to "Total_Profit"
newDF1 = df1.withColumnRenamed("Profit", "Total_Profit")

# find maximum total_profit for each region and alias the column to "Maximum"
newDF1.groupBy("Manufacturer").max("Total_Profit").alias("Maximum").show(10, False)

In [None]:
# count of models by each manufacturer
newDF1.groupBy("Manufacturer").agg({'Model':'count'}).show(10, False)

In [None]:
from pyspark.sql.functions import avg # include the library

# find average of column - "Total_Profit" 
newDF1.select(avg("Total_Profit").alias("Average Profit")).show()

In [None]:
# include the library
from pyspark.sql.functions import col

# order the records by manufacturer - ascending
df1.orderBy('Manufacturer', ascending=True).select("Manufacturer","Model","Vehicle_type", "Profit").show(3)

In [None]:
# include the library
from pyspark.sql.functions import col

# order the records by manufacturer - desc
df1.orderBy('Manufacturer', ascending=False).select("Manufacturer","Model","Vehicle_type", "Profit").show(3)

In [None]:
# cache and persist
from pyspark import StorageLevel

# cache the dataframe in in-memory
cacheDF = df1.cache()

# read the records from cache
cacheDF.select("Manufacturer", "Model", "Vehicle_type",  \
               "Latest_Launch").show(4, truncate=False)

In [None]:
# cache and persist
from pyspark import StorageLevel

# persist the dataframe in both memo
persistDF = df1.persist(StorageLevel.MEMORY_AND_DISK)

# read the records from saved dataframe
persistDF.select("Manufacturer", "Model", "Vehicle_type",  \
               "Latest_Launch").show(4, truncate=False)

In [None]:
# coalesce vs repartition
print("Number of partitions : ", df1.rdd.getNumPartitions())

# increase the number of partitions
cDF = df1.repartition(2)

# number of partitions after repatitioning
print("Number of partitions : ", cDF.rdd.getNumPartitions())

# reduce the number of partitions
cDF = cDF.coalesce(1)

# number of partitions after coalesce
print("Number of partitions : ", cDF.rdd.getNumPartitions())

In [None]:
# aggregates the Vehicle Type count by Manufacturer, brings the data to a single partition
writeDF = newDF1.groupBy("Manufacturer").agg({'Model':'count'}).coalesce(1)  

# write to DBFS - mode: "overwrite" replaces the existing file and "append" adds the content
writeDF.write.option("header","true").option("sep",",").mode("overwrite").csv("/user/glbigdata12/Aggregate/")

In [None]:
#%fs ls "/user/glbigdata12/Aggregate"

In [None]:
# read the csv file
newDF1 = spark.read.format("csv").option("header", "true").option("inferSchema", "true") \
   .load("/user/glbigdata12/Aggregate/part-00000-1de425a3-41cf-46d1-8fe8-c74b9d62149f-c000.csv")

# display the records
newDF1.show(10, False)

In [None]:
# spark SQL
# create a DataFrame
from pyspark.sql.types import *   # import the library
leader_data = [["Dodge","Mohammed Saif"],["Cadillac", "George Carlin"], \
               ["BMW", "Stuart Broad"], ["Ford", "Abdalla"], ["Hyundai", "Chris Gayle"], \
               ["Lexus", "George Bush"], ["Mercury", "Tatyaso Martin"]]

# define the schema
schema = StructType([StructField("Manufacturer", StringType()), StructField("SalesPerson", StringType())])

# create a dataframe and display the records
df_new = spark.createDataFrame(leader_data,schema=schema)
df_new.show(10, False)

In [None]:
df_new.createOrReplaceTempView("sales_table")  # convert dataframe to view

# write sql queries using sql()
spark.sql("select * from sales_table").show(10, False)

In [None]:
spark.sql("select * from sales_table where Manufacturer = 'Cadillac'").show(10, False)

In [None]:
spark.sql("select * from sales_table where SalesPerson like '%George%'").show(10, False)

In [None]:
spark.sql("select count(*) from sales_table").show()

In [None]:
df1.createOrReplaceTempView("vehicle")

spark.sql("select * from vehicle").show(1, False)

In [None]:
# renaming a column using DSL
newDF1 = df1.withColumnRenamed("Revenue", "TotalRevenue")

# create a temp view

newDF1.createOrReplaceTempView("vehicle")

# apply aggregations on the table data
spark.sql("select Manufacturer, max(TotalRevenue) from vehicle group by Manufacturer").show(truncate=False)

In [None]:
spark.sql("select Manufacturer, max(TotalRevenue) from vehicle group by Manufacturer order by Manufacturer").show(truncate=False)

In [None]:
spark.sql("select Manufacturer, max(TotalRevenue) from vehicle group by Manufacturer order by Manufacturer desc").show(truncate=False)

In [None]:
# join (inner) vehicle and sales_table, display the results
spark.sql("""select a.Manufacturer, a.Model, b.SalesPerson
       from vehicle a
       join sales_table b
       on trim(a.Manufacturer) = trim(b.Manufacturer)""").show(5, False)

In [None]:
# join (inner) vehicle and sales_table, apply a where condition, display the results
df_new = spark.sql("""select a.Manufacturer, a.Model, b.SalesPerson
       from vehicle a
       join sales_table b
       on trim(a.Manufacturer) = trim(b.Manufacturer)
       where trim(a.Manufacturer) = "Cadillac"
       """).show(5, False)

In [None]:
# write the results in to DBFS
df_new = spark.sql("""select a.Manufacturer, a.Model, b.SalesPerson
       from vehicle a
       join sales_table b
       on trim(a.Manufacturer) = trim(b.Manufacturer)
       where trim(a.Manufacturer) = "Cadillac"
       """)


df_new.coalesce(1).write.option("header","true").mode("overwrite").csv("/user/glbigdata12/spark/")

In [None]:
#%fs ls "/user/glbigdata12/spark/" 