# DataFrame FAQs

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.master("local[*]").appName("jdbc data sources").getOrCreate()

In [2]:
df = spark.read.format("csv").options(header='true', delimiter = '|').load("tmp/dataframe_sample.csv")
df.printSchema()
df.show()

root
 |-- id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- location: string (nullable = true)

+---+-------------------+-------------------+--------+
| id|           end_date|         start_date|location|
+---+-------------------+-------------------+--------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|
+---+-------------------+-------------------+--------+



In [3]:
# Instead of registering a UDF, call the builtin functions to perform operations on the columns.
# This will provide a performance improvement as the builtins compile and run in the platform's JVM.

# Convert to a Date type
df = df.withColumn('date', F.to_date(df.end_date))

# Parse out the date only
df = df.withColumn('date_only', F.regexp_replace(df.end_date,' (\d+)[:](\d+)[:](\d+).*$', ''))

# Split a string and index a field
df = df.withColumn('city', F.split(df.location, '-')[1])

# Perform a date diff function
df = df.withColumn('date_diff', F.datediff(F.to_date(df.end_date), F.to_date(df.start_date)))

In [4]:
df.registerTempTable("sample_df")
display(spark.sql("select * from sample_df"))
spark.sql("select * from sample_df").show()

DataFrame[id: string, end_date: string, start_date: string, location: string, date: date, date_only: string, city: string, date_diff: int]

+---+-------------------+-------------------+--------+----------+----------+----+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|
+---+-------------------+-------------------+--------+----------+----------+----+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|2015-10-16|2015-10-16|  NY|      275|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|2015-10-17|2015-10-17|  NY|      245|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|2015-10-18|2015-10-18|  SD|      552|
+---+-------------------+-------------------+--------+----------+----------+----+---------+



### I want to convert the DataFrame back to JSON strings to send back to Kafka.

In [5]:
#There is an underlying toJSON() function that returns an RDD of JSON strings using the column
#names and schema to produce the JSON records.
rdd_json = df.toJSON()
rdd_json.take(2)

['{"id":"1","end_date":"2015-10-14 00:00:00","start_date":"2015-09-14 00:00:00","location":"CA-SF","date":"2015-10-14","date_only":"2015-10-14","city":"SF","date_diff":30}',
 '{"id":"2","end_date":"2015-10-15 01:00:20","start_date":"2015-08-14 00:00:00","location":"CA-SD","date":"2015-10-15","date_only":"2015-10-15","city":"SD","date_diff":62}']

### My UDF takes a parameter including the column to operate on. How do I pass this parameter?

In [6]:
from pyspark.sql.functions import udf
#There is a function available called lit() that creates a constant column.
add_n = udf(lambda x, y: x + y, IntegerType())

# We register a UDF that adds a column to the DataFrame, and we cast the id column to an Integer type.
df = df.withColumn('id_offset', add_n(F.lit(1000), df.id.cast(IntegerType())))
display(df)
df.show()

DataFrame[id: string, end_date: string, start_date: string, location: string, date: date, date_only: string, city: string, date_diff: int, id_offset: int]

+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|id_offset|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|     1001|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|     1002|
|  3|2015-10-16 02:30:00|2015-01-14 00:00:00|   NY-NY|2015-10-16|2015-10-16|  NY|      275|     1003|
|  4|2015-10-17 03:00:20|2015-02-14 00:00:00|   NY-NY|2015-10-17|2015-10-17|  NY|      245|     1004|
|  5|2015-10-18 04:30:00|2014-04-14 00:00:00|   CA-SD|2015-10-18|2015-10-18|  SD|      552|     1005|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+



In [7]:
# any constants used by UDF will automatically pass through to workers
N = 90
last_n_days = udf(lambda x: x < N, BooleanType())

df_filtered = df.filter(last_n_days(df.date_diff))
display(df_filtered)
df_filtered.show()

DataFrame[id: string, end_date: string, start_date: string, location: string, date: date, date_only: string, city: string, date_diff: int, id_offset: int]

+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|id_offset|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|     1001|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|  SD|       62|     1002|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+



### I have a table in the Hive metastore and I’d like to access to table as a DataFrame. 
### What’s the best way to define this?

In [8]:
# There are multiple ways to define a DataFrame from a registered table. Syntax show below.
# Call table(tableName) or select and filter specific columns using an SQL query.
# Both return DataFrame types
df_1 = spark.table("sample_df")
df_2 = spark.sql("select * from sample_df")

### I’d like to clear all the cached tables on the current cluster.

In [9]:
# There’s an API available to do this at a global level or per table.
spark.catalog.clearCache()

### I’d like to compute aggregates on columns. What’s the best way to do this?

In [10]:
#There’s an API named agg(*exprs) that takes a list of column names and expressions for the type 
#of aggregation you’d like to compute. Documentation is available here. 
#You can leverage the built-in functions that mentioned above as part of the expressions for each column.
# Provide the min, count, and avg and groupBy the location column. Diplay the results
agg_df = df.groupBy("location").agg(F.min("id"), F.count("id"), F.avg("date_diff"))
display(agg_df)
agg_df.show()

DataFrame[location: string, min(id): string, count(id): bigint, avg(date_diff): double]

+--------+-------+---------+--------------+
|location|min(id)|count(id)|avg(date_diff)|
+--------+-------+---------+--------------+
|   NY-NY|      3|        2|         260.0|
|   CA-SF|      1|        1|          30.0|
|   CA-SD|      2|        2|         307.0|
+--------+-------+---------+--------------+



### I’d like to write out the DataFrames to Parquet, but would like to partition on a particular column.

In [12]:
#You can use the following APIs to accomplish this. Ensure the code does not create a large number 
#of partition columns with the datasets otherwise the overhead of the metadata can cause significant slow downs. 
#If there is a SQL table back by this directory, you will need to call refresh table <table-name> to update 
#the metadata prior to the query.
df = df.withColumn('end_month', F.month('end_date'))
df = df.withColumn('end_year', F.year('end_date'))
df.write.partitionBy("end_year", "end_month").mode("overwrite").parquet("tmp/sample_table")
df.printSchema()
df.show()

root
 |-- id: string (nullable = true)
 |-- end_date: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- location: string (nullable = true)
 |-- date: date (nullable = true)
 |-- date_only: string (nullable = true)
 |-- city: string (nullable = true)
 |-- date_diff: integer (nullable = true)
 |-- id_offset: integer (nullable = true)
 |-- end_month: integer (nullable = true)
 |-- end_year: integer (nullable = true)

+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+---------+--------+
| id|           end_date|         start_date|location|      date| date_only|city|date_diff|id_offset|end_month|end_year|
+---+-------------------+-------------------+--------+----------+----------+----+---------+---------+---------+--------+
|  1|2015-10-14 00:00:00|2015-09-14 00:00:00|   CA-SF|2015-10-14|2015-10-14|  SF|       30|     1001|       10|    2015|
|  2|2015-10-15 01:00:20|2015-08-14 00:00:00|   CA-SD|2015-10-15|2015-10-15|

### How do I properly handle cases where I want to filter out NULL data?

In [13]:
# You can use filter() and provide similar syntax as you would with a SQL query.
null_item_schema = StructType([StructField("col1", StringType(), True),
                               StructField("col2", IntegerType(), True)])
null_df = spark.createDataFrame([("test", 1), (None, 2)], null_item_schema)
display(null_df.filter("col1 IS NOT NULL"))
null_df.filter("col1 IS NOT NULL").show()

DataFrame[col1: string, col2: int]

+----+----+
|col1|col2|
+----+----+
|test|   1|
+----+----+

