In [21]:
from pyspark.sql import SparkSession

In [22]:
spark =  SparkSession\
        .builder\
        .appName('readfiles')\
        .getOrCreate()
spark        

In [93]:
## read csv file
df = spark.read.csv("train_schedule.csv", header=True)
df.show()

+--------+-------------+-----+
|train_id|      station| time|
+--------+-------------+-----+
|     324|San Francisco|7:59a|
|     324|  22nd Street|8:03a|
|     324|     Millbrae|8:16a|
|     324|    Hillsdale|8:24a|
|     324| Redwood City|8:31a|
|     324|    Palo Alto|8:37a|
|     324|     San Jose|9:05a|
|     217|       Gilroy|6:06a|
|     217|   San Martin|6:15a|
|     217|  Morgan Hill|6:21a|
|     217| Blossom Hill|6:36a|
|     217|      Capitol|6:42a|
|     217|       Tamien|6:50a|
|     217|     San Jose|6:59a|
+--------+-------------+-----+



In [51]:
# Another way of reading same file
df = spark.read.option("header","True").csv("train_schedule.csv")
df.show()

+--------+-------------+-----+
|train_id|      station| time|
+--------+-------------+-----+
|     324|San Francisco|7:59a|
|     324|  22nd Street|8:03a|
|     324|     Millbrae|8:16a|
|     324|    Hillsdale|8:24a|
|     324| Redwood City|8:31a|
|     324|    Palo Alto|8:37a|
|     324|     San Jose|9:05a|
|     217|       Gilroy|6:06a|
|     217|   San Martin|6:15a|
|     217|  Morgan Hill|6:21a|
|     217| Blossom Hill|6:36a|
|     217|      Capitol|6:42a|
|     217|       Tamien|6:50a|
|     217|     San Jose|6:59a|
+--------+-------------+-----+



### Spark provides this functionality of not to define schema and tries to infer it automatically

In [52]:
df = spark.read.csv("train_schedule.csv", header = True, inferSchema = True)
df.printSchema()

root
 |-- train_id: integer (nullable = true)
 |-- station: string (nullable = true)
 |-- time: string (nullable = true)



### StructType and StructField combination help you to define your own schema

In [100]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import IntegerType, StringType, TimestampType

### Create your own schema by referring columns and casting it to required type

In [99]:
data_schema = StructType([
                            StructField('train_id', IntegerType(), True),
                            StructField('station', StringType(), True),
                            StructField('time', TimestampType(), True)
])

### Read file by referring your custom schema

In [68]:
df = spark.read.csv("train_schedule.csv", schema = data_schema)

In [69]:
df.printSchema()

root
 |-- train_id: integer (nullable = true)
 |-- station: string (nullable = true)
 |-- time: timestamp (nullable = true)



### Convert Spark dataframe to Pandas dataframe using toPandas()

In [108]:
df = spark.read.csv("train_schedule.csv", header = True)
type(df)

pyspark.sql.dataframe.DataFrame

In [107]:
## read spark dataframe
df.head(2)

[Row(train_id='324', station='San Francisco', time='7:59a'),
 Row(train_id='324', station='22nd Street', time='8:03a')]

In [104]:
pdDF = df.toPandas()
type(pdDF)

pandas.core.frame.DataFrame

In [96]:
#read pandas dataframe
pdDF.head(2)

Unnamed: 0,train_id,station,time
0,324,San Francisco,7:59a
1,324,22nd Street,8:03a


### Convert Pandas dataframe to Spark dataframe using spark.createDataFrame()

In [110]:
sparkDF = spark.createDataFrame(pdDF)
type(sparkDF)

pyspark.sql.dataframe.DataFrame

### Convert RDD to Spark Dataframe

In [119]:
from pyspark import SparkConf
from pyspark import SparkContext
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))

In [122]:
rddSc = sc.parallelize([1,2,3])
rddSc

ParallelCollectionRDD[312] at parallelize at PythonRDD.scala:195

### Executing SQL statment on File by registering it as tables

In [127]:
spark.catalog.listTables()

[]

In [4]:
data = spark.read.csv('train_schedule.csv', header=True)

In [5]:
data.createOrReplaceTempView('data_table')

In [138]:
query = 'SELECT * FROM data_table'
spark.sql(query).show(5)

+--------+-------------+-----+
|train_id|      station| time|
+--------+-------------+-----+
|     324|San Francisco|7:59a|
|     324|  22nd Street|8:03a|
|     324|     Millbrae|8:16a|
|     324|    Hillsdale|8:24a|
|     324| Redwood City|8:31a|
+--------+-------------+-----+
only showing top 5 rows



### Spark SQL 6 : Chek tables presence using catalog.listTables()

In [None]:
# temporary table is created using createOrReplaceTempView
data_table = data.createOrReplaceTempView('data_table')

In [141]:
spark.catalog.listTables()

[Table(name='data_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [143]:
# drop your table
spark.catalog.dropTempView('data_table')

In [144]:
# check again
spark.catalog.listTables()

[]

## SparkSQL 7

### Caching and UnCaching tables

#### To cache Table/View : spark.catalog.cacheTable()
#### To uncache Table/view : spark.catalog.uncacheTable()
#### To check wheter cached or not : spark.catalog.isCached()

In [6]:
# List the tables
print("Tables:\n", spark.catalog.listTables())

# Cache table1 and Confirm that it is cached
spark.catalog.cacheTable('data_table')
print("table1 is cached: ", spark.catalog.isCached('data_table'))

# Uncache table1 and confirm that it is uncached
spark.catalog.uncacheTable('data_table')
print("table1 is cached: ", spark.catalog.isCached('data_table'))

Tables:
 [Table(name='data_table', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]
table1 is cached:  True
table1 is cached:  False


## #SparkSQL 8

### Caching Spark DataFrames
#### To cache : cache() or persist()
#### To uncache : unpersist()

In [7]:
data = spark.read.csv('train_schedule.csv', header=True)

In [18]:
data.cache()
data.is_cached

True

In [20]:
data.unpersist()
data.is_cached

False

In [26]:
data.persist(storageLevel= pyspark.StorageLevel.MEMORY_AND_DISK)
data.is_cached

True

## SparkSQL 9

#### Query data using DataFrame API and Spark SQL

In [28]:
data.show(5)

+--------+-------------+-----+
|train_id|      station| time|
+--------+-------------+-----+
|     324|San Francisco|7:59a|
|     324|  22nd Street|8:03a|
|     324|     Millbrae|8:16a|
|     324|    Hillsdale|8:24a|
|     324| Redwood City|8:31a|
+--------+-------------+-----+
only showing top 5 rows



In [27]:
data.groupBy('train_id').agg({'station':'count'}).show()

+--------+--------------+
|train_id|count(station)|
+--------+--------------+
|     217|             7|
|     324|             7|
+--------+--------------+



In [31]:
data.createOrReplaceTempView('data_table')
query ='SELECT train_id, count(station) FROM data_table GROUP BY train_id'
spark.sql(query).show()

+--------+--------------+
|train_id|count(station)|
+--------+--------------+
|     217|             7|
|     324|             7|
+--------+--------------+



## Spark SQL 10

### GlobalTempView

Temporary views in Spark SQL are session-scoped and will disappear<br> 
if the session that creates it terminates. If you want to have a temporary view<br>
that is shared among all sessions and keep alive until the Spark application terminates,<br>
you can create a global temporary view.

* Don't forget to include **global_temp** while referring global table/view

In [None]:
data.createGlobalTempView('dataglb_tbl')

In [34]:
query = 'SELECT * FROM global_temp.dataglb_tbl'
spark.sql(query).show(5)

+--------+-------------+-----+
|train_id|      station| time|
+--------+-------------+-----+
|     324|San Francisco|7:59a|
|     324|  22nd Street|8:03a|
|     324|     Millbrae|8:16a|
|     324|    Hillsdale|8:24a|
|     324| Redwood City|8:31a|
+--------+-------------+-----+
only showing top 5 rows



## Spark SQL 11

A common confusion happens while creating dataframe in Pandas and Spark.
* To create dataframe in pandas use: DataFrame()
* To create data dataframe in spark : createDataFrame()

In [None]:
pd_data = pd.DataFrame( 
                        {'Name' : 'Sumo', 'City':'UK'},\
                        {'Name' : 'Mark', 'City':'US'}
                    )
print('******* Pandas DatFrame*******')
print(pd_data.head())

print('******* Spark DatFrame*******')
spark_data= spark.createDataFrame(pd_data, schema=['Name', 'City'])
print(spark_data.show())

In [34]:
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.types import StringType, StructType, StructField

pd_data = pd.DataFrame( 
                        {'Name' : 'Sumo', 'City':'UK'},\
                        {'Name' : 'Mark', 'City':'US'}
                    )
print('******* Pandas DatFrame*******')
print(pd_data.head())

schema = StructType([
                    StructField('Name' , StringType(), True),
                    StructField('City', StringType(), True)
                ])
spark_data = spark.createDataFrame([Row('Sumo','UK'),
                                    Row('Mark','US')], schema=schema)
print(spark_data.head())

******* Pandas DatFrame*******
      Name City
Name  Sumo   UK
City  Sumo   UK


Py4JJavaError: An error occurred while calling o162.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
	at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
	at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:170)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:97)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:117)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:108)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:164)
	... 32 more


### Explain plan dataframes and tables

### UDF functions