In [37]:
import findspark
findspark.init("/home/avishek/spark")
import pyspark
import numpy as np
import pandas as pd
from pyspark.sql.types import StructField, StructType, StringType, LongType
from pyspark.sql.functions import *

In [11]:
spark = pyspark.sql.SparkSession.builder.master("local").appName("Basic Structured Operations").getOrCreate()

In [13]:
df = spark.read.format("json").load("./data/flight-data/json/2015-summary.json")

In [14]:
df.printSchema()

root
 |-- DEST_COUNTRY_NAME: string (nullable = true)
 |-- ORIGIN_COUNTRY_NAME: string (nullable = true)
 |-- count: long (nullable = true)



In [15]:
df.count()

256

### a) read schema from dataset

In [16]:
spark.read.format("json").load("./data/flight-data/json/2015-summary.json").schema

StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),StructField(ORIGIN_COUNTRY_NAME,StringType,true),StructField(count,LongType,true)))

### b) Define Schema Manually

If you do not define the schema properly, your read operation may read incomplete data. Also, in production settings, it is best to define your schema manually

In [34]:
myManualSchema = StructType([StructField("DEST_COUNTRY_NAME",StringType(),True),\
                            StructField("ORIGIN_COUNTRY_NAME",StringType(),True),\
                           StructField("count", LongType(), False, metadata={"hello":"world"})])

In [39]:
df = spark.read.format("json").schema(myManualSchema)\
.load("./data/flight-data/json/2015-summary.json")

In [36]:
df.limit(5).toPandas()

Unnamed: 0,DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
0,United States,Romania,15
1,United States,Croatia,1
2,United States,Ireland,344
3,Egypt,United States,15
4,United States,India,62


### access one or more columns

In [63]:
df["count","DEST_COUNTRY_NAME"]

DataFrame[count: bigint, DEST_COUNTRY_NAME: string]

### access all column names

In [52]:
spark.read.format("json").load("./data/flight-data/json/2015-summary.json")\
.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

### access a row and its elements

In [55]:
myRow = df.first()

In [64]:
myRow

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)

In [62]:
myRow[2]

15

### creating dataframes

In [65]:
df.show()

+--------------------+-------------------+-----+
|   DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|       United States|            Romania|   15|
|       United States|            Croatia|    1|
|       United States|            Ireland|  344|
|               Egypt|      United States|   15|
|       United States|              India|   62|
|       United States|          Singapore|    1|
|       United States|            Grenada|   62|
|          Costa Rica|      United States|  588|
|             Senegal|      United States|   40|
|             Moldova|      United States|    1|
|       United States|       Sint Maarten|  325|
|       United States|   Marshall Islands|   39|
|              Guyana|      United States|   64|
|               Malta|      United States|    1|
|            Anguilla|      United States|   41|
|             Bolivia|      United States|   30|
|       United States|           Paraguay|    6|
|             Algeri

In [76]:
df.select("count").show(2)

+-----+
|count|
+-----+
|   15|
|    1|
+-----+
only showing top 2 rows



In [77]:
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
|    United States|            Romania|
|    United States|            Croatia|
+-----------------+-------------------+
only showing top 2 rows



In [85]:
from pyspark.sql.functions import expr, col, column
df.select(expr("DEST_COUNTRY_NAME"),col("DEST_COUNTRY_NAME")).show(2)

+-----------------+-----------------+
|DEST_COUNTRY_NAME|DEST_COUNTRY_NAME|
+-----------------+-----------------+
|    United States|    United States|
|    United States|    United States|
+-----------------+-----------------+
only showing top 2 rows



In [84]:
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

+-------------+
|  destination|
+-------------+
|United States|
|United States|
+-------------+
only showing top 2 rows



In [88]:
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)

+-----------------+-------------------+-----+-----------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|DEST_COUNTRY_NAME|
+-----------------+-------------------+-----+-----------------+
|    United States|            Romania|   15|    United States|
|    United States|            Croatia|    1|    United States|
+-----------------+-------------------+-----+-----------------+
only showing top 2 rows



In [93]:
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry",
"(count < 5) as low_volume")\
.show(2)

+-----------------+-------------------+-----+-------------+----------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|low_volume|
+-----------------+-------------------+-----+-------------+----------+
|    United States|            Romania|   15|        false|     false|
|    United States|            Croatia|    1|        false|      true|
+-----------------+-------------------+-----+-------------+----------+
only showing top 2 rows



In [116]:
df_1 = df.selectExpr("avg(count)","count(distinct(DEST_COUNTRY_NAME)) AS ")

In [110]:
df_1.show()

+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625|                              132|
+-----------+---------------------------------+



In [103]:
df.orderBy(desc("count")).take(1)[0]

Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='United States', count=370002)

### literals

In [104]:
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)

+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
|    United States|            Romania|   15|  1|
|    United States|            Croatia|    1|  1|
+-----------------+-------------------+-----+---+
only showing top 2 rows



In [105]:
df.withColumn("numberOne", lit(1)).show(2)

+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
|    United States|            Romania|   15|        1|
|    United States|            Croatia|    1|        1|
+-----------------+-------------------+-----+---------+
only showing top 2 rows



In [106]:
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))\
.show(2)

+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
|    United States|            Romania|   15|        false|
|    United States|            Croatia|    1|        false|
+-----------------+-------------------+-----+-------------+
only showing top 2 rows



In [107]:
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

['dest', 'ORIGIN_COUNTRY_NAME', 'count']

In [108]:
dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))

In [119]:
df["count"].alias("poo")

TypeError: 'Column' object is not callable

In [None]:
dfWithLongColName = df.selectExpr("This Long Column-Name")

In [120]:
from pyspark.sql import Row
df_name = spark.createDataFrame([('Tom', 80), ('Alice', None)], ["name", "height"])

In [121]:
df_name.select(expr("name"))

DataFrame[name: string]

In [125]:
df_name.name.asc().alias("poo")

Column<'name ASC NULLS FIRST AS `poo`'>

In [128]:
df_name.select(df_name.name).orderBy(df_name.name.asc()).withColumnRenamed("name","poo").collect()

[Row(poo='Alice'), Row(poo='Tom')]