## This notebook clears following concepts
- Basic Structure operations

In [21]:
import findspark
findspark.init()

In [22]:
import pyspark
from pyspark.sql import SparkSession

from pyspark.sql.types import StructField, StructType, StringType, LongType, Row
from pyspark.sql.functions import *
from pyspark.sql import Row
from pprint import pprint

In [23]:
spark = SparkSession.builder.appName("spark_definitive_guide").getOrCreate()

In [11]:
# spark.stop()

In [24]:
manualSchema = StructType([
    StructField('DEST_COUNTRY_NAME', StringType(), True), 
    StructField('ORIGIN_COUNTRY_NAME', StringType(), True), 
    StructField('count', LongType(), False, metadata={"hello": "world"})
])

In [25]:
path = "./../src/flight-data/json/2015-summary.json"
df = spark.read.format('json').schema(manualSchema).load(path)

In [26]:
# constants

# df.schema
# df.columns
# df.dtypes
# df.isStreaming
# df.na
# df.rdd
df.sparkSession
# df.stat
# df.storageLevel
# df.write
# df.writeStream

# print([(name, type(getattr(pyspark.sql.dataframe.DataFrame, name))) for name in dir(pyspark.sql.dataframe.DataFrame)])


In [27]:
df.selectExpr('DEST_COUNTRY_NAME as country_name', 'DEST_COUNTRY_NAME').show()

+--------------------+--------------------+
|        country_name|   DEST_COUNTRY_NAME|
+--------------------+--------------------+
|       United States|       United States|
|       United States|       United States|
|       United States|       United States|
|               Egypt|               Egypt|
|       United States|       United States|
|       United States|       United States|
|       United States|       United States|
|          Costa Rica|          Costa Rica|
|             Senegal|             Senegal|
|             Moldova|             Moldova|
|       United States|       United States|
|       United States|       United States|
|              Guyana|              Guyana|
|               Malta|               Malta|
|            Anguilla|            Anguilla|
|             Bolivia|             Bolivia|
|       United States|       United States|
|             Algeria|             Algeria|
|Turks and Caicos ...|Turks and Caicos ...|
|       United States|       Uni

### difference between select and selectExpr
- `select(*cols: ColumnOrName) -> DataFrame`
- `selectExpr(*expr: str) -> DataFrame`

In [34]:
# select and selectExpr

df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count')
df.select(expr("DEST_COUNTRY_NAME as dcount"))

df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as sameCountry") # using multiple exprs
df.selectExpr('avg(count)', 'count(distinct(DEST_COUNTRY_NAME))') # no need of import of count/avg functions here




dfWithLongColName = df.withColumn("This colName have spaces", expr("count"))
dfWithLongColName.select("This colName have spaces") # back tick not required and only string name is expected
dfWithLongColName.selectExpr("`This colName have spaces` * 10") # back tick required to separate string name and expression

df.withColumnRenamed("DEST_COUNTRY_NAME", "new_name_dest_country") # all cols with one col renamed
df.withColumnRenamed("DEST_COUNTRY_NAME" , "dest_new_name").withColumnRenamed("ORIGIN_COUNTRY_NAME", "orig_new_name")

DataFrame[(This colName have spaces * 10): bigint]

### Difference between filter and where
- `filter(condition: ColumnOrName) -> DataFrame`
- `where(condition: ColumnOrName) -> DataFrame`

Both are exactly the same and alias of one another. `where` is just for SQL people who want to use the same function name

In [49]:
df.filter(col("count") < 5)
df.where("count < 5")
df.where("count < 5").where("ORIGIN_COUNTRY_NAME == 'Croatia'") # usign multiple conditions one over the other

df.drop('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME') # returns another df, dont remove inplace

# distinct finds distinct based on the combination of all the columns mentioned
df.select('DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME').distinct().count() # 256
df.select('DEST_COUNTRY_NAME').distinct().count() # 132
df.distinct().count() # 256

# sort and orderBy: Both are same, orderBy is alias of sort for SQL people
df.sort(expr("count desc"))
df.sort(col("count").asc(), col("ORIGIN_COUNTRY_NAME").desc())
df.orderBy(desc_nulls_last("ORIGIN_COUNTRY_NAME"), asc_nulls_first("DEST_COUNTRY_NAME"))
# Its better to use asc_nulls_first, asc_nulls_last, desc_nulls_first, desc_nulls_last... to better define position of null values

df.limit(5).show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



In [28]:
# Adding new Rows
schema = df.schema
newRows = [Row("New Country", "Other Country", 5), Row("New Country 2", "Other Country 2", 1)]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDf = spark.createDataFrame(parallelizedRows, schema)

df2 = df.union(newDf)

In [29]:
df2.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

23/04/28 20:02:55 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 997208 ms exceeds timeout 120000 ms
23/04/28 20:02:55 WARN SparkContext: Killing executors is not supported by current scheduler.
23/04/28 20:20:34 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:103)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:87)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:643)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1057)
	at org.apache.spark.executor.Executor.$anonfun$heartbeater$1(Executor.scala:238)
	at sc

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 57291)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 316, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 347, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 360, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socketserver.py", line 747, in __init__
    self.handle()
  File "/Users/abhishekpathak/Code & Stuff/self/Learning/SPARK/spark_learning/spark_learning/lib/python3.10/site-packages/pyspark/accumulators.py", line 281, in handle
    poll(accum_update

In [20]:
import os
dir(os)

['CLD_CONTINUED',
 'CLD_DUMPED',
 'CLD_EXITED',
 'CLD_KILLED',
 'CLD_STOPPED',
 'CLD_TRAPPED',
 'DirEntry',
 'EX_CANTCREAT',
 'EX_CONFIG',
 'EX_DATAERR',
 'EX_IOERR',
 'EX_NOHOST',
 'EX_NOINPUT',
 'EX_NOPERM',
 'EX_NOUSER',
 'EX_OK',
 'EX_OSERR',
 'EX_OSFILE',
 'EX_PROTOCOL',
 'EX_SOFTWARE',
 'EX_TEMPFAIL',
 'EX_UNAVAILABLE',
 'EX_USAGE',
 'F_LOCK',
 'F_OK',
 'F_TEST',
 'F_TLOCK',
 'F_ULOCK',
 'GenericAlias',
 'Mapping',
 'MutableMapping',
 'NGROUPS_MAX',
 'O_ACCMODE',
 'O_APPEND',
 'O_ASYNC',
 'O_CLOEXEC',
 'O_CREAT',
 'O_DIRECTORY',
 'O_DSYNC',
 'O_EVTONLY',
 'O_EXCL',
 'O_EXLOCK',
 'O_FSYNC',
 'O_NDELAY',
 'O_NOCTTY',
 'O_NOFOLLOW',
 'O_NOFOLLOW_ANY',
 'O_NONBLOCK',
 'O_RDONLY',
 'O_RDWR',
 'O_SHLOCK',
 'O_SYMLINK',
 'O_SYNC',
 'O_TRUNC',
 'O_WRONLY',
 'POSIX_SPAWN_CLOSE',
 'POSIX_SPAWN_DUP2',
 'POSIX_SPAWN_OPEN',
 'PRIO_PGRP',
 'PRIO_PROCESS',
 'PRIO_USER',
 'P_ALL',
 'P_NOWAIT',
 'P_NOWAITO',
 'P_PGID',
 'P_PID',
 'P_WAIT',
 'PathLike',
 'RTLD_GLOBAL',
 'RTLD_LAZY',
 'RTLD_LOCAL',