# Spark DataFrames and SQL

## DataFrame
 - A distributed immutable collection of data grouped into named columns
 - a two-dimensional labeled data structure with columns of potentially different types

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.Builder().appName(
    "RDD_DataFrames01").getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/10 18:53:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Creating Spark DataFrames

In [None]:
from datetime import datetime, date
import pandas as pd

from pyspark.sql import Row

- from a list of Row objects

In [None]:
df1 = spark.createDataFrame(
    [
        Row(a=10, b=2.5, c='Alice', 
            d=date(2022, 1, 31), 
            e=datetime(2022, 1, 1, 10, 0)),
        Row(a=20, b=3.7, c='Bob', 
            d=date(2022, 2, 1), 
            e=datetime(2022, 1, 2, 12, 30)),
        Row(a=30, b=5.9, c='Charlie', 
            d=date(2022, 3, 1), 
            e=datetime(2022, 1, 3, 14, 45)),
        Row(a=40, b=8.2, c='Dave', 
            d=date(2022, 1, 31), 
            e=datetime(2022, 1, 3, 17, 0))
    ]
)

df1

In [None]:
df1.printSchema()

In [None]:
type(df1)

- Using explicit schema

In [None]:
df1 = spark.createDataFrame(
    [
        (10, 2.5, 'Alice', 
            date(2022, 1, 31), 
            datetime(2022, 1, 1, 10, 0)),
        (20, 3.7, 'Bob', 
            date(2022, 2, 1), 
            datetime(2022, 1, 2, 12, 30)),
        (30, 5.9, 'Charlie', 
            date(2022, 3, 1), 
            datetime(2022, 1, 3, 14, 45)),
        (40, 8.2, 'Dave', 
            date(2022, 1, 31), 
            datetime(2022, 1, 3, 17, 0))
    ]
)

df1

In [None]:
df1 = spark.createDataFrame(
    [
        (10, 2.5, 'Alice', 
            date(2022, 1, 31), 
            datetime(2022, 1, 1, 10, 0)),
        (20, 3.7, 'Bob', 
            date(2022, 2, 1), 
            datetime(2022, 1, 2, 12, 30)),
        (30, 5.9, 'Charlie', 
            date(2022, 3, 1), 
            datetime(2022, 1, 3, 14, 45)),
        (40, 8.2, 'Dave', 
            date(2022, 1, 31), 
            datetime(2022, 1, 3, 17, 0))
    ], 
    schema='a long, b double, c string, d date, e timestamp'
)

df1

- From Pandas DataFrame

In [None]:
pandas_df = pd.DataFrame({
    'a': [10, 20, 30,40],
    'b': [2.5, 3.7, 5.9,8.2],
    'c': ['Alice','Bob','Charlie','Dave'],
    'd': [date(2022, 1, 31),date(2022, 2, 1),
          date(2022, 3, 1), date(2022, 1, 31)],
    'e': [datetime(2022, 1, 1, 10, 0),
          datetime(2022, 1, 2, 12, 30),
          datetime(2022, 1, 3, 14, 45),
         datetime(2022, 1, 3, 17, 0)]
})

df1 = spark.createDataFrame(pandas_df)
df1

- From RDD

In [None]:
sc = spark.sparkContext

rdd = sc.parallelize([
        (10, 2.5, 'Alice', 
            date(2022, 1, 31), 
            datetime(2022, 1, 1, 10, 0)),
        (20, 3.7, 'Bob', 
            date(2022, 2, 1), 
            datetime(2022, 1, 2, 12, 30)),
        (30, 5.9, 'Charlie', 
            date(2022, 3, 1), 
            datetime(2022, 1, 3, 14, 45)),
        (40, 8.2, 'Dave', 
            date(2022, 1, 31), 
            datetime(2022, 1, 3, 17, 0))
    
    ])

rdd 

In [None]:
df1 = spark.createDataFrame(
    rdd, 
    schema=['a', 'b', 'c', 'd', 'e'])

df1

## Viewing the DataFrame

### show(n = 20, truncate=True, vertical=False)
 - Viewing the first n rows of the DataFrame

In [None]:
df1.show()

In [None]:
df1.show(2)

In [None]:
df1.show(truncate=5)

In [None]:
df1.show(vertical=True)

In [None]:
# Alternatively

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

df1

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.maxNumRows', 10)

df1

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', False)

### first, take, head, tail

In [None]:
# collect all distributed data as local data

df1.collect()

In [None]:
df1.first()

In [None]:
# collect first two

df1.take(2)

In [None]:
# equivalent to

df1.head(2)

In [None]:
# collect last two

df1.tail(2)

### limit(*num*)
 - Limits the result count to the number specified

In [None]:
df1.limit(2)

In [None]:
df1.limit(2).collect()

### printSchema()
  - print the schema in tree format

In [None]:
df1.printSchema()

In [None]:
df1.columns 

### Converting to Pandas DataFrame
  - Could result in Out of Memory error if data is too large

In [None]:
df1.toPandas()

In [None]:
# Each Row can be mapped to a Dictionary

df1.take(2)[1].asDict()

### Summary

 - A DataFrame is an abstraction of the Resilient Distributed dataset (RDD)
    - dealing with higher level optimized functions 
 - It as an efficient table of an RDD with heavily optimized binary representation of the data.    - The binary representation is achieved using encoders
   - serializes the various objects into a binary structure for much better performance than RDD representation. 
 - Performance of PySpark DataFrames with these optimizations is on par with that of either Scala or Java
 - Since DataFrames uses the RDD internally anyway, a DataFrame is also distributed exactly like an RDD, and thus is also a distributed dataset. 
  - Obviously, this also means DataFrames are immutable
  - Every transformation creates a new DataFrame

In [None]:
type(df1), type(df1.first())

## Selecting and Accessing Data
 - Selecting a column returns a Column instance, not the Column data

In [None]:
df1.columns

In [None]:
df1.c

In [None]:
type(df1.c)

In [None]:
df1.select(df1.c)

In [None]:
df1.select(df1.c).show()

In [None]:
df1.select(df1.columns[2])

In [None]:
df1.select(df1.columns[2]).show()

### Select multiple columns

In [None]:
df1.select([df1.a, df1.c])

In [None]:
df1.select(['a', 'c']).show()

In [None]:
df1.select('a', 'c').show()

In [None]:
from pyspark.sql.functions import col

In [None]:
df1.select(col('a'), 
           col('c')).show()

### selectExpr
  - Projects a set of SQL expressions and returns a new DataFrame

In [None]:
df1.selectExpr("a * 5", 
               "year(d) + 1", 
               "hour(e)").show()

### Selecting with expr(...)

In [None]:
from pyspark.sql.functions import expr

In [None]:
df1.select("a", expr("a * 5")).show()

### Using an alias

In [None]:
df1.select(df1.c.alias('FirstName')).show()

In [None]:
df1.select(df1.c.alias('FirstName'), 
           df1.a.alias('Age')).show()

In [None]:
from pyspark.sql.functions import col

In [None]:
df1.select(col('c').alias('FirstName'), 
           col('a').alias('Age')).show()

In [None]:
df1.selectExpr("a * 5 as times5", 
               "year(d) + 1 as NextYear", 
               "hour(e) as HourOfDay").show()

In [None]:
df1.select(col('a').alias('Age'), 
           expr('a * 5').alias('times5')).show()

### withColumn(colName, col) - Adding or replacing a column
  - returns a new DataFrameby adding a column or replacing the existing column that has the same name

In [None]:
df1.withColumn('a10', 10 * df1.a)

In [None]:
df1.withColumn('a10', 10 * df1.a).show()

In [None]:
df1.withColumn('b', df1.b / 2).show()

In [None]:
df1.withColumn('d', df1.d + 31).show()

In [None]:
from pyspark.sql.functions import upper

df1.withColumn('c', upper(df1.c)).show()

### withColumnRenamed(*existing, new*)
  - returns a new DataFrame by renaming an existing column

In [None]:
df1.show()

In [None]:
df1.withColumnRenamed('c', 'Name').show()

### when(condition, value)

In [None]:
df1.show()

In [None]:
from pyspark.sql.functions import when

In [None]:
df1.select(df1.c, df1.a, 
           when(df1.a < 25, 'Youth').
           when(df1.a < 35, 'YoungAdult').
           otherwise('Adult'))

In [None]:
df1.select(df1.c, df1.a, 
           when(df1.a < 25, 'Youth').
           when(df1.a < 35, 'YoungAdult').
           otherwise('Adult').alias('AgeGroup')).show()

## sort, orderBy
 - Returns a new DataFrame sorted by the specified column(s)

In [None]:
df1.show()

In [None]:
df1.sort(df1.d).show()

In [None]:
df1.sort("d", ascending=False).show()

In [None]:
df1.sort(df1.d.desc(), df1.a.desc()).show()

In [None]:
df1.orderBy

In [None]:
df1.orderBy(df1.d.desc(), df1.a.desc()).show()

In [None]:
df1.orderBy("a", ascending=False).limit(2).collect()

## Dropping Columns

In [None]:
df1.show()

In [None]:
df1.drop('b').show()

In [None]:
df1.drop('d', 'e').show()

In [None]:
df1.drop(df1.d).show()

## Loading from external files

### Text files

In [None]:
df_text = spark.read.text('the-zen-of-python.txt')
df_text

In [None]:
df_text.show(4, truncate=False)

In [None]:
from pyspark.sql.functions import split, explode

In [None]:
df_wordList = df_text.select(split("value", " ").alias(
    "wordList"))

df_wordList.show(4, truncate=False)

In [None]:
df_wordList.printSchema()

- explode: Returns a new row for each element in the given array

In [None]:
df_words = df_wordList.select(
    explode("wordList").alias("word"))

df_words.show(10, truncate=False)
                                           

In [None]:
df_words.printSchema()

In [None]:
df_wordCounts = df_words.groupBy("word").count()
df_wordCounts

In [None]:
df_wordCounts.show(4)

In [None]:
df_wordCounts.sort("word").show(8)

In [None]:
from pyspark.sql.functions import desc

In [None]:
df_wordCounts.sort(desc("count")).show(8)

### CSV files

In [None]:
df_csv = spark.read.csv('scores_data.csv.gz', header=True)
df_csv

In [None]:
# Alternative

spark.read.option("header", True).csv('scores_data.csv.gz')

In [None]:
df_csv.count()

In [None]:
df_csv.show(4)

In [None]:
df_csv.printSchema()

#### Infer the Schema

In [None]:
df_csv = spark.read.csv('scores_data.csv.gz', 
                        header=True, 
                        inferSchema=True)
df_csv

In [None]:
# Alternative

spark.read.options(header = True, inferSchema = True).csv('scores_data.csv.gz')

In [None]:
df_csv.printSchema()

In [None]:
df_csv.describe()

In [None]:
df_csv.describe().printSchema()

In [None]:
df_csv.describe(df_csv.columns[2:]).show()

### JSON files

In [None]:
df_json = spark.read.json('scores_data.json.gz')
df_json

In [None]:
df_json.count()

In [None]:
df_json.show(4)

In [None]:
df_json.printSchema()

## Operations on DataFrames


### filter(*condition*), where(*condition*)
 - filter based on a Column expressions or a string of SQL expression
  - **where** is an alias for **filter**

In [None]:
df_csv.show(4)

In [None]:
df_csv.filter(df_csv.Test2 > 100)

In [None]:
df_csv.filter(df_csv.Test2 > 100).count()

In [None]:
df_csv.filter(df_csv.Test2 > 100).show(4)

In [None]:
# Using col syntax

from pyspark.sql.functions import col

In [None]:
df_csv.filter(col('Test2') > 100).show(4)

In [None]:
# SQL syntax

df_csv.filter("Test2 > 100").show(4)

 #### multiple conditions AND (&), OR (!), NOT (!)

In [None]:
df_csv.where((df_csv.Section == 'B') &
             (df_csv.Test2 > 100)).show()

In [None]:
# col syntax 

df_csv.where((col('Section') == 'B') &
             (col('Test2') > 100)).show()

In [None]:
# SQL syntax 

df_csv.where("Section == 'B' AND Test2 > 100").show()

#### filter on list values

In [None]:
df_csv.filter(df_csv.Section.isin(['A','C'])).count()

In [None]:
# col syntax

df_csv.filter(col('Section').isin(['A','C'])).count()

In [None]:
# Sql syntax

df_csv.filter("Section IN ('A','C')").count()

In [None]:
df_csv.filter((df_csv.Section.isin(['A','C'])) &
              (df_csv.Test2 > 100)).show()

In [None]:
# SQL syntax

df_csv.filter("Section IN ('A','C') AND Test2 > 100").show()

In [None]:
# Not in the list

df_csv.filter((~df_csv.Section.isin(['A','C'])) &
              (df_csv.Test2 > 100)).show()

In [None]:
# Same as

df_csv.filter((df_csv.Section.isin(['A','C']) == False) &
              (df_csv.Test2 > 100)).show()

In [None]:
# SQL syntax

df_csv.filter("Section NOT IN ('A','C') AND Test2 > 100").show()

#### startswith, endswith, contains

In [None]:
df_text.show(4)

In [None]:
df_text.filter(df_text.value.startswith(
    'Complex')).show(truncate=False)

In [None]:
df_text.filter(df_text.value.endswith(
    'it.')).show(truncate=False)

In [None]:
df_text.filter(df_text.value.contains(
    'better')).show(truncate=False)

In [None]:
# not contains

df_text.filter(~df_text.value.contains(
    'better')).show(truncate=False)

#### between

In [None]:
df1.show()

In [None]:
df1.filter(col('d').between('2022-01-15', '2022-02-15')).show()

In [None]:
df1.filter(df1.a.between(10,20)).show()

In [None]:
# not in between

In [None]:
df1.filter(~df1.a.between(10,20)).show()

#### like, rlike (regular expression syntax)

In [None]:
df_csv.filter(df_csv.Student.like('Student_99999%')).show()

In [None]:
df_text.filter(df_text.value.like(
    '%tly%')).show(truncate=False)

In [None]:
df_text.filter(df_text.value.like(
    '%complex%')).show(truncate=False)

In [None]:
# regex (rlike)

df_text.filter(df_text.value.rlike(
    '^I')).show(truncate=False)

In [None]:
df_text.filter(df_text.value.rlike(
    's.$')).show(truncate=False)

In [None]:
df_text.filter(df_text.value.rlike(
    'complex')).show(truncate=False)

In [None]:
# case insensitive

df_text.filter(df_text.value.rlike(
    '(?i)complex')).show(truncate=False)

#### array column

In [None]:
df_wordList.show(4, truncate=False)

In [None]:
df_wordList.take(4)

In [None]:
from pyspark.sql.functions import array_contains

In [None]:
df_wordList.filter(array_contains(
    df_wordList.wordList, 'better')).show(truncate=False)

### groupBy, groupby
  - Groups the DataFrame using the specified columns
    - used for aggregation on the groups

In [None]:
df_csv.groupBy()

In [None]:
df_csv.groupBy().avg()

In [None]:
df_csv.groupBy().avg().show()

In [None]:
# alternatively, mean is alias for avg

df_csv.groupBy().mean().show()

#### Rounding columns

In [None]:
from pyspark.sql.functions import round

In [None]:
result = df_csv.groupBy().avg()
result

In [None]:
for c in result.columns:
    result = result.withColumn(c, round(c, 2))

result.show()

#### Group by Column

In [None]:
result = df_csv.groupBy('Section').avg()
result

In [None]:
for c in result.columns[1:]:
    result = result.withColumn(c, round(c, 2))

result.show()

In [None]:
df_csv.groupBy('Section').min().show()

In [None]:
df_csv.groupBy('Section').max().show()

In [None]:
df_csv.groupBy('Section').count().show()

In [None]:
df_csv.groupBy('Section').sum().show()

In [None]:
# For specific column

df_csv.groupBy('Section').sum('Test3').show()

In [None]:
# For multiple columns

df_csv.groupBy('Section').min('Test1', 'Test3').show()

In [None]:
# Rename columns

df_csv.groupBy('Section').min('Test1', 'Test3').select(
    'Section', 
    col('min(Test1)').alias('min_Test1'),
    col('min(Test3)').alias('min_Test3')
).show()

#### Multiple aggregations - agg()

In [None]:
from pyspark.sql.functions import sum, min, max, avg, mean, count, countDistinct

In [None]:
df_csv.groupBy('Section').agg(min('Test1')).show()

In [None]:
df_csv.groupBy('Section').agg(
    min('Test1'), max('Test2'), avg('Test3'), 
    count('Test4'), countDistinct('Test4')).show()

In [None]:
df_csv.groupBy('Section').agg(
    min('Test1').alias('min_test1'), 
    max('Test2').alias('max_test2'), 
    avg('Test3').alias('avg_test3'),
    count('Test4').alias('count_test4'),
    countDistinct('Test4').alias('countDistinct_test4')).show()

- dictionary mapping - column to aggregate function

In [None]:
df_csv.groupBy('Section').agg({'*' : 'count'}).show()

In [None]:
df_csv.groupBy('Section').agg(
    {'Test1' : 'min', 'Test2' : 'max', 
     'Test4': 'count'}).show()

In [None]:
# limitation - can't map same column to more than one function

df_csv.groupBy('Section').agg(
    {'Test1' : 'min', 'Test2' : 'max', 
     'Test4': 'count', 'Test4': 'sum'}).show()

In [None]:
df_csv.select('*', 
              col('Test4').alias(
                  'Test4_2')).groupBy('Section').agg(
    {'Test1' : 'min', 'Test2' : 'max', 
     'Test4': 'count', 'Test4_2': 'sum'}).show()

#### with filter

In [None]:
df_csv.groupBy('Section').agg(
    min('Test1').alias('min_test1'), 
    max('Test2').alias('max_test2'), 
    avg('Test3').alias('avg_test3'),
    count('Test4').alias('count_test4')).show()

In [None]:
df_csv.groupBy('Section').agg(
    min('Test1').alias('min_test1'), 
    max('Test2').alias('max_test2'), 
    avg('Test3').alias('avg_test3'),
    count('Test4').alias('count_test4')
).filter('min_test1 > 5 AND max_test2 > 100').show()

#### Grouping by multiple columns

In [None]:
df_json.show(4)

In [None]:
df_json.groupBy('Section', 'Gender').count().show()

In [None]:
# orderBy

df_json.groupBy(
    'Section', 'Gender').count().orderBy('Section').show()

In [None]:
df_json.groupBy(
    'Section', 'Gender').count().orderBy(
    'Section', 'Gender').show()

In [None]:
df_json.groupBy('Section', 'Gender').min(
    'Test1').show()

In [None]:
sorted(df_json.groupBy('Section', 'Gender').min(
    'Test1').collect())

In [None]:
df_json.groupBy('Section', 'Gender').min(
    'Test1').sort('Section')

In [None]:
df_json.groupBy('Section', 'Gender').min(
    'Test1').sort('Section').show()

In [None]:
df_json.groupBy('Section', 'Gender').min(
    'Test1').sort('Section', 'Gender').show()

In [None]:
from pyspark.sql.functions import desc

In [None]:
df_json.groupBy('Section', 'Gender').min(
    'Test1').sort('Section', desc('Gender')).show()

### union, intersect, subtract
  - returns a new DataFrame with 
     - union of rows in both (union)
     - rows only in both (intersect)
     - rows only in this but not the other (subtract)

In [None]:
df_A = spark.createDataFrame(data = 
                            [('Alice', 21),('Bob', 22),('Dave', 24)],
                            schema = ['Name', 'Age'])
df_A.show()

In [None]:
df_B = spark.createDataFrame(data = 
                            [('Bob', 22),('Dave', 26), ('Ed', 30)],
                            schema = ['Name', 'Age'])
df_B.show()

In [None]:
df_A.union(df_B).show()

In [None]:
# Remove duplicates using distinct()

df_A.union(df_B).distinct().show()

In [None]:
df_A.intersect(df_B).show()

In [None]:
df_A.subtract(df_B).show()

In [None]:
df_B.subtract(df_A).show()

### join, crossJoin

In [None]:
df_A = spark.createDataFrame(
    data = 
    [('Alice', 21),('Bob', 22),('Dave', 24), ('Frank', 20)],
    schema = ['Name', 'Age'])

df_A.show()

In [None]:
df_B = spark.createDataFrame(
    data = 
    [('Bob', 50000),('Dave', 60000), ('Ed', 70000)],
    schema = ['Name', 'Salary'])

df_B.show()

In [None]:
df_A.join(df_B)

In [None]:
df_A.join(df_B).show()

In [None]:
df_A.join(df_B).explain()

In [None]:
# cartesian product

df_A.crossJoin(df_B).show()

- join(otherDF, on, how)
- Join on column - default inner join

In [None]:
df_A.show()
df_B.show()

In [None]:
df_A.join(df_B, on = 'Name', how = 'inner').show()

In [None]:
df_A.join(df_B, on = 'Name', how = 'outer').show()

In [None]:
df_A.join(df_B, on = 'Name', how = 'left').show()

In [None]:
df_A.join(df_B, on = 'Name', how = 'right').show()

#### Left Semi Join
 - Similar to inner join
 - Returns all columns from left dataframe and ignores columns from right dataframe

In [None]:
df_A.show()
df_B.show()

In [None]:
df_A.join(df_B, on = 'Name', how='leftsemi').show()

#### Left Anti Join
 - Opposite of Left Semi join
 - Returns only columns from left dataframe for non-matched records

In [None]:
df_A.join(df_B, on = 'Name', how='leftanti').show()

#### Join using different column names

In [None]:
df_C = spark.createDataFrame(
    data = 
    [('Bob', 50000),('Dave', 60000), ('Ed', 70000)],
    schema = ['FirstName', 'Salary'])

df_C.show()

In [None]:
df_A.join(df_C, on = df_A.Name == df_C.FirstName, 
          how = 'inner').show()

In [None]:
df_A.join(df_C, on = df_A.Name == df_C.FirstName, 
          how = 'inner').select(
    df_C.FirstName, df_C.Salary, df_A.Age).show()

In [None]:
# Or,

df_A.join(df_C, on = df_A.Name == df_C.FirstName, 
          how = 'inner').drop(df_A.Name).show()

#### Self join
  - e.g., who are older than each person

In [None]:
df_A.show()

In [None]:
df_A.join(df_A).show()

In [None]:
df_A.alias('df1').join(df_A.alias('df2')
                      ).filter(
    col("df1.Age") < col("df2.Age")).show()

In [None]:
df_A.alias('df1').join(df_A.alias('df2')
                      ).filter(
    col("df1.Age") < col("df2.Age")).select(
    col("df1.Name").alias("Name"), 
    col("df1.Age").alias("Age"), 
    col("df2.Name").alias("OlderPersonName"), 
    col("df2.Age").alias("OlderPersonAge")
).show()

### crosstab(*col1, col2*)
  - two way frequency table
  - computes a pair-wise frequency table of the given columns
  - Also known as a contingency table

In [None]:
df_json.show()

In [None]:
df_json.crosstab('Section', 'Gender')

In [None]:
df_json.crosstab('Section', 'Gender').show()

In [None]:
# with groupBy

df_json.groupBy('Section', 'Gender').count().show()

In [None]:
df_json.filter('Test1 > 75 AND Test4 > 65').count()

In [None]:
df_json.filter('Test1 > 75 AND Test4 > 65').crosstab('Test1', 'Test4').show()

In [None]:
# With Group by
df_json.filter('Test1 > 75 AND Test4 > 65').groupBy('Test1', 'Test4').count().show()

### pivot
  - Pivots a column of the current DataFrame and perform the specified aggregation

In [None]:
df_json.printSchema()

In [None]:
df_json.groupBy('Section').pivot('Gender').count().show()

In [None]:
df_json.groupBy('Gender').pivot('Section').count().show()

In [None]:
df_json.groupBy('Section').pivot('Gender').max('Test1').show()

In [None]:
df_json.groupBy('Gender').pivot('Section').max('Test1').show()

In [None]:
df_json.groupBy('Section').pivot('Gender').agg(
    {'Test1': 'max', 'Test2': 'min'}).show()

In [None]:
# Compare with

df_json.groupBy('Section','Gender').agg(
    {'Test1': 'max', 'Test2': 'min'}).show()

- Pivot with specified values  (more efficient)

In [None]:
df_json.groupBy('Gender').pivot('Section', ['A', 'C']).max('Test1').show()

In [None]:
df_json.groupBy('Gender').pivot('Section', ['A', 'C']).min().show()

### cube
 - Creates a multi-dimensional cube for the current DataFrame using the specified columns, so we can run aggregations on them
 - Creates combinations of all values in all listed columns

In [None]:
df_json.cube('Section', 'Gender')

In [None]:
df_json.cube('Section', 'Gender').count().show()

In [None]:
df_json.cube('Section', 'Gender'
            ).count().orderBy('Section', 'Gender').show()

In [None]:
df_json.cube('Section', 'Gender'
            ).max('Test1').orderBy('Section', 'Gender').show()

In [None]:
df_json.cube('Test3','Section', 'Gender').count().orderBy(
    'Test3', 'Section', 'Gender').show(40)

In [None]:
df_json.cube('Test3','Section', 'Gender').count().orderBy(
    'Test3', 'Section', 'Gender').count()

In [None]:
df_json.cube('Test3','Section', 'Gender').agg({'Test1': 'max', 'Test2': 'min'}).orderBy(
    'Test3', 'Section', 'Gender').show(40)

### rollup
  - Create a multi-dimensional rollup for the current DataFrame using the specified columns, so we can run aggregation on them
  - Rollup returns a hierarchy of values using the given columns starting from the first given column.

In [None]:
df_json.rollup('Section', 'Gender')

In [None]:
df_json.rollup('Section', 'Gender'
            ).count().show()

In [None]:
# compare with cube

df_json.cube('Section', 'Gender'
            ).count().show()

In [None]:
df_json.rollup('Section', 'Gender'
            ).count().orderBy('Section', 'Gender').show()

In [None]:
# compare with cube

df_json.cube('Section', 'Gender'
            ).count().orderBy('Section', 'Gender').show()

In [None]:
df_json.rollup('Test3','Section', 'Gender').count().orderBy(
    'Test3', 'Section', 'Gender').show(40)

In [None]:
df_json.rollup('Test3','Section', 'Gender').count().orderBy(
    'Test3', 'Section', 'Gender').count()

### Cube vs Rollup
  - Assume grouping by three columns
  - Year, Month, Day
    - Calculate number of rows

| Cube                | Rollup                  | SQL |
| :-----------        | :-----------             |:-----|
| Year, Month, Day    | Year, Month, Day        |SELECT COUNT(*) FROM table GROUP BY Year, Month, Day|
| Year, Month         | Year, Month            |SELECT COUNT(*) FROM table GROUP BY Year, Month|
| Year, Day         | -            |SELECT COUNT(*) FROM table GROUP BY Year, Day|
| Year             | Year           |SELECT COUNT(*) FROM table GROUP BY Year|
| Month, Day         | -            |SELECT COUNT(*) FROM table GROUP BY Month, Day|
| Month         | -            |SELECT COUNT(*) FROM table GROUP BY Month|
| Day         | -            |SELECT COUNT(*) FROM table GROUP BY Day|
|null, null, null| null     | SELECT COUNT(*) FROM table |


### Sorting

In [None]:
df_D = spark.createDataFrame(
    data = 
    [('Alice', 21),('Bob', 22),
     ('Dave', 24), ('Frank', 20), 
     ('Charlie', 22), ('Ed', 24)],
    schema = ['Name', 'Age'])

df_D.show()

In [None]:
df_D.sort(df_D.Name.desc()).show()

In [None]:
df_D.sort(df_D.Age).show()

In [None]:
df_D.sort(df_D.Age, df_D.Name).show()

In [None]:
df_D.sort(df_D.Age.desc()).show()

In [None]:
df_D.sort(df_D.Age.desc(), df_D.Name).show()

In [None]:
from pyspark.sql.functions import asc, desc

In [None]:
df_D.show()

In [None]:
df_D.sort(desc('Name')).show()

In [None]:
df_D.sort(desc('Age')).show()

In [None]:
df_D.sort(desc('Age'), asc('Name')).show()

### describe(), summary()
  - The summary() and describe() transformations produce similar descriptive statistics, with the summary() transformation additionally producing quartiles.

In [None]:
df_json.describe()

In [None]:
df_json.describe().show()

In [None]:
df_json.summary()

In [None]:
df_json.summary().show()

### forEach
  - applies the specified function for each row of the DataFrame

In [None]:
df_A.show()

In [None]:
df_A.foreach(lambda row: print(row))

In [None]:
df_A.foreach(lambda row: 
             print("{:8s} is {} years old".format(
                 row['Name'], row['Age'])))

## Stop the session

In [None]:
spark.stop()