# Spark DataFrames

Use Spakr DataFrames rather than RDDs whenever possible. In general, Spark DataFrames are more performant, and the performance is consistent across differnet languagge APIs. Unlike RDDs which are executed on the fly, Spakr DataFrames are compiled using the Catalyst optimiser and an optimal execution path executed by the engine. Since all langugaes compile to the same execution code, there is no difference across languages (unless you use user-defined funcitons UDF).

In [None]:
spark.version

Starting Spark application




## RDDs and DataFrames

In [None]:
data = [('ann', 'spring', 'math', 98),
        ('ann', 'fall', 'bio', 50),
        ('bob', 'spring', 'stats', 100),
        ('bob', 'fall', 'stats', 92),
        ('bob', 'summer', 'stats', 100),
        ('charles', 'spring', 'stats', 88),
        ('charles', 'fall', 'bio', 100)   
       ]

In [None]:
rdd = sc.parallelize(data)

In [None]:
rdd.take(3)

In [None]:
df = spark.createDataFrame(rdd, ['name', 'semester', 'subject', 'score'])

In [None]:
df.show()

In [None]:
df.show(3)

In [None]:
df.rdd.take(3)

In [None]:
df.describe()

## Converstion to and from pandas

Make sure your data set can fit into memory before converting to `pandas`.

In [None]:
pdf = df.toPandas()
pdf

In [None]:
spark.createDataFrame(pdf).show()

## Schemas

In [None]:
df.printSchema()

## Data manipulation

### Selecting columns

In [None]:
df.select(['name', 'subject', 'score']).show()

### Filtering rows

In [None]:
df.filter(df['score'] > 90).show()

### Mutating values

In [None]:
import pyspark.sql.functions as F

Using select

In [None]:
df.select(df['name'], df['semester'], df['subject'], df['score'], 
          (df['score'] - 10).alias('adj_score')).show()

Using `withColumn`

In [None]:
df.withColumn('sqrt_socre', F.sqrt(F.col('score'))).show()

**Version 1**: Using a User Defined Funciton (UDF)

Note: Using a Python UDF is not efficient.

In [None]:
@F.udf
def score_to_grade(g):
    if g > 90:
        return 'A'
    elif g > 80:
        return 'B'
    else:
        return 'C'

In [None]:
df.withColumn('grade', score_to_grade(df['score'])).show()

**Version 2**: Using built-in fucntions.

More performant version

In [None]:
score_to_grade_fast = (
    F.
    when(F.col('score') > 90, 'A').
    when(F.col('score') > 80, 'B').
    otherwise('C')
)

In [None]:
df.withColumn('grade_fast', score_to_grade_fast).show()

### Sorting

In [None]:
df.sort(F.col('score')).show()

In [None]:
df.sort(F.col('score').desc()).show()

Alternative syntax

In [None]:
df.sort(df.score.desc()).show()

### Summarizing

In [None]:
df.agg(
    {'score': 'mean'}
).show()

In [None]:
df.agg(
    F.mean(df.score).alias('avg'),
    F.min(df.score).alias('min'),
    F.max(df.score).alias('max')
).show()

### Split-Apply-Combine

In [None]:
df.groupby('name').agg({'score': 'mean', 'subject': 'count'}).show()

### Join

In [None]:
meta = [('ann', 'female', 23),
        ('bob', 'male', 19),
        ('charles', 'male', 22),
        ('daivd', 'male', 23)
       ]

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [None]:
schema = StructType([
    StructField('name', StringType(), True),
    StructField('sex', StringType(), True),
    StructField('age', IntegerType(), True)
])

In [None]:
df_meta = spark.createDataFrame(meta, schema)

In [None]:
df_meta.printSchema()

In [None]:
df_meta.show()

In [None]:
df.join(df_meta, on='name', how='inner').show()

In [None]:
df_full = df.join(df_meta, on='name', how='rightouter')
df_full.drop()

In [None]:
df_full.groupby('sex').mean().show()

In [None]:
df_full.groupby('sex').pivot('subject').agg(F.mean('age')).show()

In [None]:
(
    df_full.
    dropna().
    groupby('sex').
    pivot('subject').
    agg(F.mean('age')).
    show()
)

## Using SQL

In [None]:
df_full.createOrReplaceTempView('table')

### Select columns

In [None]:
spark.sql('select name, age from table').show()

### Filter rows

In [None]:
spark.sql('select name, age from table where age > 20').show()

### Mutate

In [None]:
spark.sql('select name, age + 2 as adj_age from table').show()

### Sort

In [None]:
spark.sql('select name, age from table order by age desc').show()

### Summary

In [None]:
spark.sql('select mean(age) from table').show()

### Split-apply-combine


In [None]:
q = '''
select sex, mean(score), min(age)
from table
group by sex
'''

In [None]:
spark.sql(q).show()

## Using SQL magic

In [None]:
%%sql

select sex, mean(score), min(age)
from table
group by sex

### Capture output locally (i.e. not sent to livy and cluster)

In [None]:
%%sql -q -o df1

select sex, score, age from table

In [None]:
%%local

%matplotlib inline
import matplotlib.pyplot as plt

plt.scatter(x='age', y='score', data=df1)
plt.show()

## I/O options

### CSV

In [None]:
df_full.write.mode('overwrite').option('header', 'true').csv('foo.csv')

In [None]:
foo = spark.read.option('header', 'true').csv('foo.csv')

In [None]:
foo.show()

### JSON

In [None]:
df_full.write.mode('overwrite').json('foo.json')

In [None]:
foo = spark.read.json('foo.json')

In [None]:
foo.show()

### Parquet

This is an efficient columnar store.

In [None]:
df_full.write.mode('overwrite').parquet('foo.parquet')

In [None]:
foo = spark.read.parquet('foo.parquet')

In [None]:
foo.show()

## Example

In [None]:
text = spark.read.text('/data/texts/Portrait.txt')