In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Avi').getOrCreate()

23/03/04 10:21:42 WARN Utils: Your hostname, avijit-HP-Laptop-15q-bu0xx resolves to a loopback address: 127.0.1.1; using 192.168.18.7 instead (on interface wlo1)
23/03/04 10:21:42 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/04 10:21:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
flight_df = spark.read.format('csv').option("inferschema","true").option("header", "true").load("./data/2015-summary.csv")

In [8]:
flight_df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: integer (nullable = true)



In [9]:
spark.read.format('csv').option("inferschema","true").option("header", "true").load("./data/2015-summary.csv").schema

StructType([StructField('DEST_COUNTRY_NAME', StringType(), True), StructField('ORIGIN_COUNTRY_NAME', StringType(), True), StructField('count', IntegerType(), True)])

In [14]:
from pyspark.sql.types import StructType,StructField,StringType,LongType

In [15]:
manual_schema = StructType(
[
    StructField("DEST_COUNTRY_NAME",StringType(),True),
    StructField("ORIGIN_COUNTRY_NAME",StringType(),True),
    StructField("count",LongType(),False,metadata={"hello":"world"})
    
])

flight_df = spark.read.format("csv").option("header",'true').schema(manual_schema).load("./data/2015-summary.csv")

In [17]:
flight_df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [18]:
# columns are logical transformations that simply represent a value computed on per record basis simply based on a value 
from pyspark.sql.functions import col,column

In [19]:
col('someColumnName')

Column<'someColumnName'>

In [20]:
column('someColumnName')

Column<'someColumnName'>

Expression: an expression is a set of transformations on one or more values in a record in a Dataframe.

Spark Compiles these to a logical tree specifying the order of transformations

In [29]:
from pyspark.sql.functions import expr
expr("(((someCol+5)*200)-6) < otherCol")

Column<'((((someCol + 5) * 200) - 6) < otherCol)'>

above code looks like sql code but the execution will be same as spark builds a logical tree prior to execution

In [30]:
flight_df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [31]:
flight_df.first()

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

# Creating Rows


In [33]:
from pyspark.sql import Row
my_row = Row("hello",None,1,False)

In [34]:
my_row[0]

'hello'

In [36]:
my_row[2]

1

# DataFrame Transformations

# Creating a DataFrame from Set of Rows

In [39]:
from pyspark.sql import Row
from pyspark.sql.types import StructType,StructField,StringType,LongType
my_schema = StructType([
    StructField('some',StringType(),True),
    StructField(('col'),StringType(),True),
    StructField("names",LongType(),False)
])

my_row = Row("hello",None,1)
my_df = spark.createDataFrame([my_row],my_schema)

In [40]:
my_df.show()

+-----+----+-----+
| some| col|names|
+-----+----+-----+
|hello|null|    1|
+-----+----+-----+



In [42]:
flight_df.select("DEST_COUNTRY_NAME","ORIGIN_COUNTRY_NAME").show(2)


+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [44]:
flight_df.select(expr("DEST_COUNTRY_NAME as destination").alias('DEST_COUNTRY_NAME')).show(2)


+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
|    United States|
|    United States|
+-----------------+
only showing top 2 rows



In [45]:
flight_df.selectExpr("DEST_COUNTRY_NAME as destination",'DEST_COUNTRY_NAME').show(2)


+-------------+-----------------+
|  destination|DEST_COUNTRY_NAME|
+-------------+-----------------+
|United States|    United States|
|United States|    United States|
+-------------+-----------------+
only showing top 2 rows



In [48]:
flight_df.selectExpr("*", "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as within_country").show()

+--------------------+-------------------+-----+--------------+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|within_country|
+--------------------+-------------------+-----+--------------+
|       United States|            Romania|   15|         false|
|       United States|            Croatia|    1|         false|
|       United States|            Ireland|  344|         false|
|               Egypt|      United States|   15|         false|
|       United States|              India|   62|         false|
|       United States|          Singapore|    1|         false|
|       United States|            Grenada|   62|         false|
|          Costa Rica|      United States|  588|         false|
|             Senegal|      United States|   40|         false|
|             Moldova|      United States|    1|         false|
|       United States|       Sint Maarten|  325|         false|
|       United States|   Marshall Islands|   39|         false|
|              Guyana|      United State

In [49]:
flight_df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME))").show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



# Converting to Spark Types Literals

In [50]:
from pyspark.sql.functions import lit
flight_df.select(expr("*"),lit(1).alias("one")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [51]:
flight_df.selectExpr("*","1 as one").show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|one|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



# adding a column

In [52]:
flight_df.withColumn("number_one",lit(1)).show(2)

+-----------------+-------------------+-----+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|number_one|
+-----------------+-------------------+-----+----------+
|    United States|            Romania|   15|         1|
|    United States|            Croatia|    1|         1|
+-----------------+-------------------+-----+----------+
only showing top 2 rows



In [53]:
flight_df.withColumn("within_country",expr("DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME")).show(2)

+-----------------+-------------------+-----+--------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|within_country|
+-----------------+-------------------+-----+--------------+
|    United States|            Romania|   15|         false|
|    United States|            Croatia|    1|         false|
+-----------------+-------------------+-----+--------------+
only showing top 2 rows



In [54]:
flight_df.withColumnRenamed("DEST_COUNTRY_NAME","dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [55]:
"""
parindoko manzil milegi humesha ye phaile huen unke pankh bolte hain...
Wohin log rehte hain khamosh aksar zamaneme jinke hunar bolte hain...

"""

'\nparindoko manzil milegi humesha ye phaile huen unke pankh bolte hain...\nWohin log rehte hain khamosh aksar zamaneme jinke hunar bolte hain...\n\n'

# Filtering Rows

In [58]:
flight_df.where("count<2").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
|    United States|          Singapore|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [63]:
flight_df.where("count<2").where(col("ORIGIN_COUNTRY_NAME")=="Croatia").show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+



# Getting Unique Rows

In [64]:
flight_df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().count()

256

In [65]:
flight_df.select("ORIGIN_COUNTRY_NAME","DEST_COUNTRY_NAME").distinct().show()

+-------------------+--------------------+
|ORIGIN_COUNTRY_NAME|   DEST_COUNTRY_NAME|
+-------------------+--------------------+
|            Romania|       United States|
|            Croatia|       United States|
|            Ireland|       United States|
|      United States|               Egypt|
|              India|       United States|
|          Singapore|       United States|
|            Grenada|       United States|
|      United States|          Costa Rica|
|      United States|             Senegal|
|      United States|             Moldova|
|       Sint Maarten|       United States|
|   Marshall Islands|       United States|
|      United States|              Guyana|
|      United States|               Malta|
|      United States|            Anguilla|
|      United States|             Bolivia|
|           Paraguay|       United States|
|      United States|             Algeria|
|      United States|Turks and Caicos ...|
|          Gibraltar|       United States|
+----------

# Random Samples 


In [67]:
seed = 5 
withReplacment = False
fraction = 0.5
flight_df.sample(withReplacment,fraction,seed).count()

138

# Random Splits 

In [68]:
flight_dfs = flight_df.randomSplit([0.25,0.75],seed)

In [71]:
flight_dfs[0].count()

71

# concatenating and appending rows 

In [None]:
#df.union(new_df)

# Sorting Rows

In [73]:
flight_df.sort('count').show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|          Moldova|      United States|    1|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+
only showing top 2 rows



In [76]:
flight_df.orderBy(expr("count desc"),"DEST_COUNTRY_NAME").show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|        Burkina Faso|      United States|    1|
|       Cote d'Ivoire|      United States|    1|
|              Cyprus|      United States|    1|
|            Djibouti|      United States|    1|
|           Indonesia|      United States|    1|
|                Iraq|      United States|    1|
|              Kosovo|      United States|    1|
|               Malta|      United States|    1|
|             Moldova|      United States|    1|
|       New Caledonia|      United States|    1|
|Saint Vincent and...|      United States|    1|
|            Suriname|      United States|    1|
|       United States|            Estonia|    1|
|       United States|   Papua New Guinea|    1|
|       United States|             Cyprus|    1|
|       United States|            Bahrain|    1|
|       United States|          Lithuania|    1|
|       United State