In [1]:
# drop($col1)
#   Drop a column within your DataFrame
#   In the example below, we have dropped the "level" column from our DataFrame
# sw.drop("level").show()
display(sw.drop("level"))

## Learning PySpark
### Chapter 4: DataFrames Operations
This notebook contains sample code from Chapter 4 of [Learning PySpark]() focusing on PySpark and DataFrame Functions

#### Generating data to be used for the various functions

In [4]:
# Generate our own JSON data 
#   This way we don't have to access the file system yet.
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown",
    "goldDate": "2005-01-22",
    "level": -1
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green",
    "goldDate": "2011-11-12",
    "level": -2
  }""", 
  """{
    "id": "345",
    "name": "Simone",
    "age": 23,
    "eyeColor": "blue",
    "goldDate": "2008-06-07",
    "level": -4
  }""")
)


# Generate our own JSON data 
#   This way we don't have to access the file system yet.
stringLevelRDD = sc.parallelize((""" 
  { 
    "level": -1,
    "levelName": "Gold"
  }""",
"""{
    "level": -2,
    "levelName": "Silver"
  }""",
"""{
    "level": -3,
    "levelName": "Bronze"
  }""")
)

In [5]:
# Create DataFrames
sw = spark.read.json(stringJSONRDD)
lv = spark.read.json(stringLevelRDD)

In [6]:
# Include pyspark.sql.functions
from pyspark.sql.functions import *

#### Display Data
Take a quick view of the DataFrames

In [8]:
display(sw)

In [9]:
display(lv)

### Display Operations

In [11]:
# collect()
#  Display the data in your DataFrame as a List of row objects
sw.collect()

In [12]:
# columns()
#   Returns the columns within your DataFrame as a List
sw.columns

In [13]:
# head(n=None)
#   Returns the first n rows.
sw.head(2)

In [14]:
# Show()
#   Display the data in your DataFrame in Tabular format by using the .show(<n>); by default n = 10.
sw.show()

### Query Operations

In [16]:
# filter(condition)
#    Filters rows using the given condition.
sw.filter("age > 20").show()

In [17]:
# first()
#   Returns the first row as a Row.
sw.first()

In [18]:
# isLocal

#sw.isLocal()


In [19]:
# where
#    Alias for filter()
sw.where("age <= 20").show()

### Join Operations

In [22]:
# Join [Inner (default)] swimmers (sw) and level (lv) DataFrames
display(sw.join(lv, sw.level == lv.level, "inner") \
  .select(sw.name, sw.eyeColor, lv.levelName))

In [23]:
# Join [Left Outer] swimmers (sw) and level (lv) DataFrames
display(sw.join(lv, sw.level == lv.level, "left_outer") \
  .select(sw.name, sw.eyeColor, lv.levelName))

In [24]:
# Join [Outer] swimmers (sw) and level (lv) DataFrames
display(sw.join(lv, sw.level == lv.level, "outer") \
  .select(sw.name, sw.eyeColor, lv.levelName))

In [25]:
# Join [Right Outer] swimmers (sw) and level (lv) DataFrames
display(sw.join(lv, sw.level == lv.level, "right_outer") \
  .select(sw.name, sw.eyeColor, lv.levelName))

In [26]:
# Join [Left Semi] swimmers (sw) and level (lv) DataFrames
display(sw.join(lv, sw.level == lv.level, "leftsemi") \
  .select(sw.name, sw.eyeColor, sw.level))

In [27]:
# Intersect
#    Return a new DataFrame containing rows only in both this frame and another DataFrame. 

In [28]:
# Create the sw2 DataFrame which is missing a row 
sw2 = sw.filter("age >= 22")

In [29]:
# Show the intersection of the sw and sw2 DataFrames
sw.intersect(sw2).show()

### Aggregate and Grouping Operations

In [31]:
# agg
#   shorthand for df.groupBy.agg()
sw.agg({"age":"max"}).show()

In [32]:
# groupBy
#    Group By statement
sw.groupBy('name').agg({'age': 'max'}).show()

###Cache and Plan Operations

In [34]:
# Cache the data for faster query operations
#   Persists with the default storage level (MEMORY_ONLY).
sw.cache()

In [35]:
# explain(extended=False)
#   Prints the (logical and physical) plans to the console for debugging purpose.
#   `extended=True` will print out both logial and physical plans 

# Create DataFrame
df = sw.join(lv, sw.level == lv.level, "left_outer").select(sw.name, sw.eyeColor, lv.levelName)

# Explain the plan
df.explain(extended=True)

### Column Operations

In [37]:
# alias("$name")
#    Provides an alias to a column for your DataFrame
sw.select(
  sw.level,
  abs(sw.level).alias('abs_level')
).show()

In [38]:
# drop($col1)
#   Drop a column within your DataFrame
#   In the example below, we have dropped the "level" column from our DataFrame
# sw.drop("level").show()
display(sw.drop("level"))

### Partition Operations

In [40]:
# Get the number of partitions 
#  Note this is a RDD operation
sw.rdd.getNumPartitions()

In [41]:
# Coalesce
#  Merge partitions together df.coalesce(n) 
sw.coalesce(2).rdd.getNumPartitions()

In [42]:
# CreateOrReplaceTempView("$name$")
#   Create or replace a temporary view (will be available for the life of the `Spark Session`)
sw.createOrReplaceTempView("swimmers")

In [43]:
# createTempView("$name$")
#   Creates a temporary view (will be available for the life of the `Spark Session`)
#   Will receive exception below if the temporary table already exists
#sw.createTempView("swimmers")

In [44]:
# dropTempTable("$name$")
#   Drop Temporary Table / view; note that you are using the `sqlContext` or `spark` instead of the DataFrame
sqlContext.dropTempTable("swimmers")

In [45]:
# distinct()
#   Returns a DataFrame of distinct rows from your DataFrame
sw.distinct()

In [46]:
# dropDuplicates
#   Drop duplicates from your DataFrame based on the columns specified (in a List)
#   If no columns are specified, then duplicate rows are dropped based on all columns
sw.dropDuplicates(['name', 'eyeColor'])

In [47]:
# drop_duplicates
#   Alias for dropDuplicates
sw.drop_duplicates(['name', 'eyeColor'])

### Null Operations

In [49]:
# dropna(how='any', thresh=None, subset=None)
#   Returns a new DataFrame omitting rows with null values. 

# First create a new DF with null values
df = sw.join(lv, sw.level == lv.level, "outer").select(sw.name, sw.eyeColor, lv.levelName)

# Now drop null rows
df.dropna().show()

In [50]:
# fillna(value, subset=None)
#   Replace null values, alias for na.fill(). DataFrame.fillna() and DataFrameNaFunctions.fill() are aliases of each other.

# Create table using outer join with nulls
df = sw.join(lv, sw.level == lv.level, "outer").select(sw.name, sw.eyeColor, lv.levelName)

# replace NULLs for the `levelName` columns
#df.na.fill({'levelName': 'unknown'}).show()

# replace NULLs for all columns
df.fillna('unknown').show()