In [1]:
import os
import sys
import subprocess
import pandas as pd

# Create the fundamental spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# Load env
from dotenv import load_dotenv  # noqa: E402
load_dotenv()
sys.path.append(os.environ['PROJECT_ROOT'] + '/src')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/18 10:02:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Create a Spark DF from a Pandas DF
p_df = pd.DataFrame([
	['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]
], columns=['color', 'fruit', 'v1', 'v2'])
s_df = spark.createDataFrame(p_df)
s_df.show()
s_df.printSchema()  # Note: ints becomes long.  Compare this with next load result.

                                                                                

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: long (nullable = true)
 |-- v2: long (nullable = true)



In [3]:
# Alternatively, read into a spark df from data
data_dir = f'{os.environ['PROJECT_ROOT']}/data'
subprocess.run(f'mkdir {data_dir}'.split())
p_df.to_csv(f'{data_dir}/spark_data.csv', index=False, header=True)

s_df = spark.read.csv(f'{data_dir}/spark_data.csv', header=True)
s_df.show()
s_df.printSchema()  # Note that the int columns are loaded as str despite being saved as int

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)



In [4]:
# Alternatively, load can be used in the generic case
s_df = spark.read.load(f'{data_dir}/spark_data.csv', format='csv', header=True)
s_df.show()
s_df.printSchema()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)



In [5]:
# Select particular column/columns
s_df.select(s_df.color, s_df.v2).show()

+-----+---+
|color| v2|
+-----+---+
|  red| 10|
| blue| 20|
|  red| 30|
| blue| 40|
|  red| 50|
|black| 60|
|  red| 70|
|  red| 80|
+-----+---+



In [6]:
# Create a new column
s_df = s_df.withColumn('sum', s_df.v1 + s_df.v2)
s_df.show()
s_df.printSchema()  # note that the sum is double despite the summands being str

+-----+------+---+---+----+
|color| fruit| v1| v2| sum|
+-----+------+---+---+----+
|  red|banana|  1| 10|11.0|
| blue|banana|  2| 20|22.0|
|  red|carrot|  3| 30|33.0|
| blue| grape|  4| 40|44.0|
|  red|carrot|  5| 50|55.0|
|black|carrot|  6| 60|66.0|
|  red|banana|  7| 70|77.0|
|  red| grape|  8| 80|88.0|
+-----+------+---+---+----+

root
 |-- color: string (nullable = true)
 |-- fruit: string (nullable = true)
 |-- v1: string (nullable = true)
 |-- v2: string (nullable = true)
 |-- sum: double (nullable = true)



In [7]:
# Conditionally select rows
s_df.filter(s_df.v1 > 6).show()

+-----+------+---+---+----+
|color| fruit| v1| v2| sum|
+-----+------+---+---+----+
|  red|banana|  7| 70|77.0|
|  red| grape|  8| 80|88.0|
+-----+------+---+---+----+



In [8]:
# Average the numerical columns (one in this case; the new sum) across fruit types
avg_df = s_df.groupby('fruit').avg()
avg_df.show()

+------+------------------+
| fruit|          avg(sum)|
+------+------------------+
| grape|              66.0|
|banana|36.666666666666664|
|carrot|51.333333333333336|
+------+------------------+



In [10]:
# Save this new result as parquet
avg_df.write.parquet(f'{data_dir}/out.parquet', mode='overwrite')

                                                                                