In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark.stop()

In [None]:
spark = SparkSession.builder \
                    .appName("MyApp") \
                    .master('local[*]') \
                    .getOrCreate()
sc = spark.sparkContext

In [None]:
# For Jupyter
%pylab inline
!export PYSPARK_DRIVER_PYTHON='jupyter'
!export PYSPARK_DRIVER_PYTHON_OPTS='notebook'

# If you want to time queries with %%time
#spark.conf.set('spark.sql.repl.eagerEval.enabled', True) 
#spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 10)

# Overview

The fundamental data structure for PySpark is the **RDD** whose API tries to be as similar to a list as possible.

When the entries in your RDD are sufficiently structured (e.g. tuples of the same length and sequence of data types), you can convert it into a **DataFrame** whose API tries to be as similar to a pandas DataFrame as possible.

While you can do quite a bit with DataFrame methods that only require passing column names, many processes require providing the DataFrame with a **Column** expression. Column expressions are objects that can reference a column as it is or, for example, a column sorted and cast to a different data type. These expressions get passed to DataFrame methods as if they represented a column that already existed in the DataFrame, though obviously the DataFrame will first have to build the column defined by the column expression before doing anything with it. Column expressions can define a column that is a transformation of each row, a shuffling of all rows, an reduction of rows, a reduction of columns, etc. Column expressions can be built with DataFrame method but are mainly built from method in the `sql.functions` module.

Column expressions can be passed not only to DataFrames but also to **GroupedData** objects that result from applying group-by operations to a dataframe. Their behavior is essentially the same.

Finally, some Column expressions are built using **WindowSpec** objects thereby leading to a window expression 

## RDD

## DataFrames

In [None]:
from datetime import datetime, date
import pandas as pd

In [None]:
data = pd.DataFrame({
    'col1': [1, 2, 3],
    'col2': [2., 3., 4.],
    'col3': ['string1', 'string2', 'string3'],
    'col4': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'col5': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)],
    'col6': ['2010-07-19T19:12:12.510', '2010-07-19T19:12:12.510', '2010-07-19T19:12:12.510'],
})
df = spark.createDataFrame(data)
df.printSchema()

In [None]:
df.withColumn('col6', F.to_timestamp('col6')).select('col6')

In [None]:
########################################
# Attributes
assert(type(df.schema) == pyspark.sql.types.StructType)

assert(df.columns == ['col1', 'col2', 'col3', 'col4', 'col5'])
assert(df.dtypes == [
    ('col1', 'bigint'),
    ('col2', 'double'),
    ('col3', 'string'),
    ('col4', 'date'),
    ('col5', 'timestamp')
])
print('isStreaming  =',df.isStreaming)
print('sql_ctx      =',df.sql_ctx)
print('storageLevel =',df.storageLevel)
print('isLocal()    =',df.isLocal())

In [None]:
########################################
# Viewing
########################################
# Print Summaries
# df.printSchema()
# df.show(first_n_rows, vertical=False)
# df.explain()

# EDA
# df.count()
# df.describe()
# df.summary()
# df.first()
# df.head()
# df.take()
# df.tail()

# Selecting Rows
# df.filter
# df.limit

In [None]:
F.column(F.col('my_col'))

In [None]:
x = F.col('col1') ** 2
df.select(x)

In [None]:
x = F.expr('POWER(col1,2)')
df.select(x)

In [None]:
x = F.col('POWER(col1,2)')
df.select(x)

In [None]:
########################################
# Column objects and column selection
########################################
# Creating a column object for selecting
assert(type(df['col1']) 
    == type(df.col1) 
    == type(F.col('col1')) 
    == pyspark.sql.Column)
assert(df['col1'] is not df.col1) # Column object created on call

#########
# Creating transformer column objects
assert repr(F.col('my_col')) == "Column<'my_col'>"

# Math
assert str(F.col( 'my_col') + 1    ) == str(F.col('(my_col + 1)'))
assert str(F.col( 'my_col') * 2 - 1) == str(F.col('((my_col * 2) - 1)'))
assert str(F.col( 'my_col') ** 2   ) == str(F.col('POWER(my_col,2)'))
#  # cube root

# F.abs()
# F.round(), F.ceil(), F.floor()
# F.bround() # Half even rounding instead of half up
# F.exp(), # F.expm1(), F.log(), F.log10(), F.log1p(), F.log2()
# F.factorial()
# F.pow(), F.sqrt(), F.cbrt()







# Trig
# F.sin(),   F.cos(),   F.tan()
# F.asin(),  F.acos(),  F.atan(), F.atan2()
# F.sinh(),  F.cosh(),  F.tanh()
# F.asinh(), F.acosh(), F.atanh()
# F.degrees(), F.radians()
# F.toDegrees(), F.toRadians()
# F.hypot()

# Stats
# F.sum()
# F.avg(), F.mean()
# F.stddev()
# F.stddev_pop()
# F.stddev_samp()
# F.skewness()
# F.corr()
# F.covar_pop()
# F.covar_samp()

# Categorical

# Text
# F.format_number()
# F.format_string()
# F.from_csv()
# F.from_json()
# F.get_json_object()

# Sorting
# F.asc(),  F.asc_nulls_first(),  F.asc_nulls_last()
# F.desc(), F.desc_nulls_first(), F.desc_nulls_last()

# Time
# F.add_months()
# F.current_date()
# F.current_timestamp()
# F.date_add()
# F.date_format()
# F.date_sub()
# F.date_trunc()
# F.datediff()
# F.dayofmonth()
# F.dayofweek()
# F.dayofyear()
# F.days()
# F.from_unixtime()
# F.from_utc_timestamp()
# F.minute()
# F.month()
# F.months()
# F.months_between()
# F.next_day()
# F.second()
# F.timestamp_seconds()
# F.weekofyear()
# F.year()
# F.years()

# Array
# F.array_contains()
# F.array_distinct()
# F.array_except()
# F.array_intersect()
# F.array_join()
# F.array_max()
# F.array_min()
# F.array_position()
# F.array_remove()
# F.array_repeat()
# F.array_sort()
# F.array_union()
# F.arrays_overlap()
# F.arrays_zip()
# F.element_at()
# F.exists()
# F.explode()
# F.explode_outer()
# F.flatten()

# Aggregation
# F.aggregate()
# F.collect_list()
# F.collect_set()
# F.count()
# F.countDistinct()
# F.approx_count_distinct() # F.approxCountDistinct() is deprecated
# F.first()
# F.grouping()
# F.grouping_id()

# Type conversion
# F.ascii()
# F.base64()
# F.bin()
# F.conv()
# F.decode()
# F.encode()
# F.hash()
# F.hex()

# Multi-column
# F.array()
# F.coalesce()
# F.concat()
# F.concat_ws()
# F.greatest()

# Window
# F.cume_dist()
# F.dense_rank()

# Other
# F.assert_true()
# F.bitwiseNOT()
# F.broadcast()
# F.expr()
# F.filter()
# F.forall()

# Unsure
# F.bucket()
# F.crc32()
# F.create_map()
# F.functools()

# Unsorted
# F.hour()
# F.hours()
# F.last_day()

# F.initcap()
# F.input_file_name()
# F.instr()
# F.isnan()
# F.isnull()
# F.json_tuple()
# F.kurtosis()
# F.lag()
# F.last()
# F.lead()
# F.least()
# F.length()
# F.levenshtein()
# F.lit()
# F.locate()
# F.lower()
# F.lpad()
# F.ltrim()
# F.map_concat()
# F.map_entries()
# F.map_filter()
# F.map_from_arrays()
# F.map_from_entries()
# F.map_keys()
# F.map_values()
# F.map_zip_with()
# F.max()
# F.md5()
# F.min()
# F.monotonically_increasing_id()
# F.nanvl()
# F.nth_value()
# F.ntile()
# F.overlay()
# F.pandas_udf()
# F.percent_rank()
# F.percentile_approx()
# F.posexplode()
# F.posexplode_outer()
# F.quarter()
# F.raise_error()
# F.rand()
# F.randn()
# F.rank()
# F.regexp_extract()
# F.regexp_replace()
# F.repeat()
# F.reverse()
# F.rint()
# F.row_number()
# F.rpad()
# F.rtrim()
# F.schema_of_csv()
# F.schema_of_json()
# F.sequence()
# F.sha1()
# F.sha2()
# F.shiftLeft()
# F.shiftRight()
# F.shiftRightUnsigned()
# F.shuffle()
# F.signum()
# F.since()
# F.size()
# F.slice()
# F.sort_array()
# F.soundex()
# F.spark_partition_id()
# F.split()
# F.struct()
# F.substring()
# F.substring_index()
# F.sumDistinct()
# F.sys()
# F.to_csv()
# F.to_date()
# F.to_json()
# F.to_str()
# F.to_timestamp()
# F.to_utc_timestamp()
# F.transform()
# F.transform_keys()
# F.transform_values()
# F.translate()
# F.trim()
# F.trunc()
# F.udf()
# F.unbase64()
# F.unhex()
# F.unix_timestamp()
# F.upper()
# F.var_pop()
# F.var_samp()
# F.variance()
# F.warnings()
# F.when()
# F.window()
# F.xxhash64()
# F.zip_with()

# Modifying a column object

# Selecting a column
col1_expr = df['col1']
assert(type(df.select('col1'))
    == type(df.select(col1_expr))
    == type(df[['col1']])
    == pyspark.sql.dataframe.DataFrame)

col1_df = df.select('col1')
col1 = [row['col1'] for row in col1_df.collect()]
assert col1 == data['col1'].tolist()
assert all(col1_df.toPandas() == data[['col1']])
#df.selectExpr

In [None]:
str(F.col( 'my_col') * 2 - 1)

In [None]:
########################################
# Modifying
########################################
# Add column
# df.withColumn
# df.withColumnRenamed

# Data Cleaning
assert(df.na.df is df)
#df.replace # (df.na.replace)
#df.fillna # (df.na.fill)
#df.drop
#df.dropDuplicates # (df.drop_duplicates)
#df.dropna # (df.na.drop)

# Conversions
#df.rdd
#df.toJSON()
#df.toPandas()

In [None]:
from pyspark.sql import functions as F

In [None]:
########################################
# Calculations
########################################
# Aggregating
#df.agg()
print(df.agg())
print(F.avg('col1'))

# Stats
assert(df.stat.df is df)
#df.cov (df.stat.cov)
#df.approxQuantile (df.stat.approxQuantile)
#df.sampleBy (df.stat.sampleBy)
#df.corr (df.stat.corr)
#df.freqItems (df.stat.freqItems)
#df.crosstab (df.stat.crosstab)

# Functions
from pyspark.sql import functions

# Apply user-defined functions (UDF)
# df.mapInPandas
# @pandas_udf()

In [None]:
########################################
# Grouping
########################################
#df.groupBy # (df.groupby)
df_grp = df.groupBy('col1')
assert(type(df_grp) == pyspark.sql.group.GroupedData)

# Group reduce
# df_grp.count()
# df_grp.sum()
# df_grp.avg() # (df_grp.mean())
# df_grp.min()
# df_grp.max()
# df_grp.agg()
# df_grp.apply()
# df_grp.applyInPandas()

# Regroup
# df_grp.pivot()
# df_grp.cogroup()

In [None]:
########################################
# SQL
########################################
#df.createGlobalTempView
#df.createOrReplaceTempView
#df.createTempView
#spark.udf.register()
#spark.sql()

In [None]:
########################################
# I/O
########################################
#df.write
#df.writeTo
#df.writeStream

In [None]:
########################################
# Unsorted methods
########################################
# Same name as RDD
# df.sample
# df.toDF
# df.randomSplit
# df.toLocalIterator
# df.checkpoint
# df.cache
# df.unpersist
# df.persist
# df.join
# df.distinct
# df.union
# df.subtract
# df.repartition
# df.coalesce
# df.foreach
# df.foreachPartition

# Not in RDD
# df.agg
# df.alias
# df.colRegex
# df.crossJoin
# df.cube
# df.exceptAll
# df.hint
# df.inputFiles
# df.intersect
# df.intersectAll
# df.orderBy
# df.registerTempTable
# df.repartitionByRange
# df.rollup
# df.sameSemantics
# df.semanticHash
# df.sort
# df.sortWithinPartitions
# df.transform
# df.unionAll
# df.unionByName
# df.where
# df.withWatermark

## Functions

## Grouping

## Window