In [1]:
people = spark.read.json('resources/people.json')
df = spark.read.json('resources/people.json')

In [2]:
users = spark.read.parquet('resources/users.parquet')

In [3]:
employees = spark.read.json('resources/employees.json')
df2 = spark.read.json('resources/employees.json')

In [4]:
people.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [5]:
users.show()

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+



In [6]:
employees.show()

+-------+------+
|   name|salary|
+-------+------+
|Michael|  3000|
|   Andy|  4500|
| Justin|  3500|
|  Berta|  4000|
+-------+------+



#### agg
* Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg())

In [10]:
df.agg({"age": "max"}).collect()

[Row(max(age)=30)]

In [15]:
from pyspark.sql import functions as F
df.agg(F.min(df.age)).collect()

[Row(min(age)=19)]

#### alias
* Returns a new DataFrame with an alias set.

In [17]:
from pyspark.sql.functions import *
df_as1 = df.alias("df_as1")
df_as2 = df.alias("df_as2")
joined_df = df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
joined_df.select("df_as1.name", "df_as2.name", "df_as2.age").collect()

[Row(name=u'Michael', name=u'Michael', age=None),
 Row(name=u'Andy', name=u'Andy', age=30),
 Row(name=u'Justin', name=u'Justin', age=19)]

### Works the same way as RDD
#### cache
#### checkpoint
#### coalesce
#### collect
* Returns rows

#### count
* Returns num of rows
<hr>
<hr>


#### columns
* Returns all column names as list

In [19]:
df.columns

['age', 'name']

#### corr
* Calculates the correlation of two columns of a DataFrame as a double value. 
* Currently only supports the Pearson Correlation Coefficient.

In [22]:
cdf = spark.createDataFrame([(1, 4), (2, 5), (3, 6)], ["A", "B"])

cdf.corr('A','B')

1.0

#### createGlobalTempView
* Creates a global temporary view with this DataFrame.
* The lifetime of this temporary view is tied to this <u>Spark application</u>
* throws TempTableAlreadyExistsException, if the view name already exists in the catalog.

In [23]:
df.createGlobalTempView("people")

In [24]:
df2 = spark.sql("select * from global_temp.people")

In [25]:
sorted(df.collect()) == sorted(df2.collect())

True

In [26]:
df2.collect()

[Row(age=None, name=u'Michael'),
 Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin')]

In [27]:
spark.catalog.dropGlobalTempView("people")

#### createOrReplaceGlobalTempView
* Creates or replaces a global temporary view using the given name.
* The lifetime of this temporary view is tied to this <u>Spark application.</u>

#### createTempView
* Creates a local temporary view with this DataFrame.
* The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame. 
* throws TempTableAlreadyExistsException, if the view name already exists in the catalog.

In [30]:
df.createTempView("people")
df2 = spark.sql("select * from people")
sorted(df.collect()) == sorted(df2.collect())

True

#### createOrReplaceTempView
* Creates or replaces a local temporary view with this DataFrame.
* The lifetime of this temporary table is tied to the SparkSession that was used to create this DataFrame.

#### describe
* Computes statistics for numeric and string columns.
* This include count, mean, stddev, min, and max. 
* If no columns are given, this function computes statistics for all numerical or string columns.

In [32]:
df.describe().show()

+-------+------------------+-------+
|summary|               age|   name|
+-------+------------------+-------+
|  count|                 2|      3|
|   mean|              24.5|   null|
| stddev|7.7781745930520225|   null|
|    min|                19|   Andy|
|    max|                30|Michael|
+-------+------------------+-------+



#### distinct
* Returns a new DataFrame containing the distinct rows in this DataFrame.

In [34]:
df.distinct().collect()

[Row(age=None, name=u'Michael'),
 Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin')]

#### drop
* Returns a new DataFrame that drops the specified column

In [38]:
df = spark.read.json('resources/people.json')
df.drop('age').collect()

[Row(name=u'Michael'), Row(name=u'Andy'), Row(name=u'Justin')]

In [39]:
df = spark.read.json('resources/people.json')
df.drop(df.age).collect()

[Row(name=u'Michael'), Row(name=u'Andy'), Row(name=u'Justin')]

In [40]:
df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()

[Row(age=None, name=u'Michael', salary=3000),
 Row(age=30, name=u'Andy', salary=4500),
 Row(age=19, name=u'Justin', salary=3500)]

In [41]:
df.join(df2, 'name', 'inner').drop('age', 'height').collect()

[Row(name=u'Michael', salary=3000),
 Row(name=u'Andy', salary=4500),
 Row(name=u'Justin', salary=3500)]

#### dropduplicates
* Return a new DataFrame with duplicate rows removed, optionally only considering certain columns.
* For a static batch DataFrame, it just drops duplicate rows.

In [43]:
from pyspark.sql import Row
f = sc.parallelize([ \
     Row(name='Alice', age=5, height=80), \
     Row(name='Alice', age=5, height=80), \
     Row(name='Alice', age=10, height=80)]).toDF()

In [45]:
f.dropDuplicates().show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
| 10|    80|Alice|
+---+------+-----+



In [46]:
f.dropDuplicates(['name', 'height']).show()

+---+------+-----+
|age|height| name|
+---+------+-----+
|  5|    80|Alice|
+---+------+-----+



#### dropna
* Returns a new DataFrame omitting rows with null values
* how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
* thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
* subset – optional list of column names to consider.

In [47]:
df.na.drop(how='any').show()

+---+------+
|age|  name|
+---+------+
| 30|  Andy|
| 19|Justin|
+---+------+



#### explain
* Prints the (logical and physical) plans to the console for debugging purpose.

In [48]:
df.explain()

== Physical Plan ==
*FileScan json [age#524L,name#525] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/tensorFolder/pyspark/resources/people.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<age:bigint,name:string>


#### fillna
* Replace null values

In [51]:
df4 = spark.createDataFrame([(None, "joseph"), (12, "james"), (31, None)], ["age", "name"])
df4.na.fill({'age': 50, 'name': 'unknown'}).show()

+---+-------+
|age|   name|
+---+-------+
| 50| joseph|
| 12|  james|
| 31|unknown|
+---+-------+



#### filter
* Filters rows using the given condition.
* where is alias

In [53]:
df.filter(df.age > 20).collect()

[Row(age=30, name=u'Andy')]

In [54]:
df.where(df.age == 19).collect()

[Row(age=19, name=u'Justin')]

In [55]:
df.where("age = 19").collect()

[Row(age=19, name=u'Justin')]

### Same as RDD
#### first
#### foreach
#### foreachPartition

#### groupBy
* Groups the DataFrame using the specified columns, so we can run aggregation on them. 
* See GroupedData for all the available aggregate functions.

In [57]:
df.groupBy().avg().collect()

[Row(avg(age)=24.5)]

In [58]:
df.groupBy('name').agg({'age': 'mean'}).collect()

[Row(name=u'Michael', avg(age)=None),
 Row(name=u'Andy', avg(age)=30.0),
 Row(name=u'Justin', avg(age)=19.0)]

In [59]:
df.groupBy(df.name).avg().collect()

[Row(name=u'Michael', avg(age)=None),
 Row(name=u'Andy', avg(age)=30.0),
 Row(name=u'Justin', avg(age)=19.0)]

In [60]:
df.groupBy(['name', df.age]).count().collect()

[Row(name=u'Andy', age=30, count=1),
 Row(name=u'Michael', age=None, count=1),
 Row(name=u'Justin', age=19, count=1)]

#### head
* First n rows

In [7]:
df.head()

Row(age=None, name=u'Michael')

#### intersect
* Returns dataframe with common rows

In [8]:
#### join

In [10]:
df.join(df2, df.name == df2.name, 'outer').select(df.name).collect()

[Row(name=u'Michael'), Row(name=u'Andy'), Row(name=None), Row(name=u'Justin')]

In [11]:
#### schema

In [13]:
df.schema

StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))

In [14]:
#### select

In [15]:
df.select('*').collect()

[Row(age=None, name=u'Michael'),
 Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin')]

In [16]:
df.select(df.name, (df.age + 10).alias('age')).collect()

[Row(name=u'Michael', age=None),
 Row(name=u'Andy', age=40),
 Row(name=u'Justin', age=29)]

In [17]:
#### selectExpr

In [18]:
df.selectExpr("age * 2", "abs(age)").collect()

[Row((age * 2)=None, abs(age)=None),
 Row((age * 2)=60, abs(age)=30),
 Row((age * 2)=38, abs(age)=19)]

In [None]:
#### sort

In [27]:
df.sort(df.age.desc()).collect()

[Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin'),
 Row(age=None, name=u'Michael')]

In [28]:
df.orderBy(df.age.desc()).collect()

[Row(age=30, name=u'Andy'),
 Row(age=19, name=u'Justin'),
 Row(age=None, name=u'Michael')]

In [29]:
from pyspark.sql.functions import *

In [30]:
df.sort(asc("age")).collect()

[Row(age=None, name=u'Michael'),
 Row(age=19, name=u'Justin'),
 Row(age=30, name=u'Andy')]

In [19]:
#### withColumn

In [20]:
df.withColumn('age2', df.age + 2).collect()

[Row(age=None, name=u'Michael', age2=None),
 Row(age=30, name=u'Andy', age2=32),
 Row(age=19, name=u'Justin', age2=21)]

In [None]:
##### withColumnRenamed

In [21]:
df.withColumnRenamed('age', 'age2').collect()

[Row(age2=None, name=u'Michael'),
 Row(age2=30, name=u'Andy'),
 Row(age2=19, name=u'Justin')]

#### storagelevel
#### subtract
#### take
#### union
#### unpersist

In [23]:
#### toDF

In [24]:
df.toDF('f1', 'f2').collect()

[Row(f1=None, f2=u'Michael'), Row(f1=30, f2=u'Andy'), Row(f1=19, f2=u'Justin')]

In [25]:
#### toPandas()

In [26]:
df.toPandas()

Unnamed: 0,age,name
0,,Michael
1,30.0,Andy
2,19.0,Justin
