In [48]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg
import pyspark.sql.functions as F

spark = SparkSession.builder \
    .appName("learning") \
    .getOrCreate()


spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.repl.eagerEval.maxNumRows", 5)

In [49]:
import os
current_directory = os.getcwd()
print("Current folder:", f"{current_directory}/work")

print("Files here:", os.listdir(f"{current_directory}/work"))

Current folder: /home/jovyan/work
Files here: ['a_spark_the_definitive_guide', '.git', 'README.md']


In [53]:
base_data_path = "work/a_spark_the_definitive_guide/data"

# schema definition
flight_schema = StructType([
    StructField("DEST_COUNTRY_NAME", StringType(), True ),
    StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
    StructField("count", LongType(), False)
])


df = spark.read.format("json").schema(flight_schema).load(f"{base_data_path}/flight-data/json/2015-summary.json")
df

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62


In [54]:
group_cols = ["DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME"]
by_count_country = (
                    df.groupBy("DEST_COUNTRY_NAME")
                    #df.groupBy(group_cols)
                   .agg(F.sum(F.col("count")).alias("COUNTRY_COUNT"))
                   .sort(F.col("COUNTRY_COUNT"), ascending=False)
                )
by_count_country

DEST_COUNTRY_NAME,COUNTRY_COUNT
United States,411352
Canada,8399
Mexico,7140
United Kingdom,2025
Japan,1548


In [58]:
# select
single_col = df.select("DEST_COUNTRY_NAME","count")
single_col

DEST_COUNTRY_NAME,count
United States,15
United States,1
United States,344
Egypt,15
United States,62


In [None]:
# accessing dataframe column
df.columns

['DEST_COUNTRY_NAME', 'ORIGIN_COUNTRY_NAME', 'count']

In [57]:
# creating temporary view
df.createOrReplaceTempView("dfTable")

In [None]:
## Usin selectExpr
df_with_mod_column_names = (
    df.selectExpr(
    "DEST_COUNTRY_NAME as destination", 
    "ORIGIN_COUNTRY_NAME as origin", 
    "count as flight_count"
)
)
df_with_mod_column_names

destination,origin,flight_count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Egypt,United States,15
United States,India,62


In [67]:
df_doubled_count = df.withColumn("Traffic Cat", 
                                 F.when(F.col("count")<100, "High Traffic").otherwise("Low")
)
df_doubled_count 

DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count,Traffic Cat
United States,Romania,15,High Traffic
United States,Croatia,1,High Traffic
United States,Ireland,344,Low
Egypt,United States,15,High Traffic
United States,India,62,High Traffic
