In [None]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row



In [None]:
import os

In [None]:
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("myApp") \
    .master("local[*]") \
    .config("spark.driver.bindAddress", "127.0.0.1") \
    .config("spark.sql.catalogImplementation", "hive")\
    .getOrCreate()

spark dashboard is available at: [http://localhost:4040/](http://localhost:4040/)

In [None]:
df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
df

In [None]:
df.show()

In [None]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df

## Create a Dataframe from pandas

In [None]:
pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

In [None]:
df.printSchema()

In [None]:
df.columns

In [None]:
df.describe()

## Selecting and Accessing Data

In [None]:
df.a

In [None]:
from pyspark.sql import Column
from pyspark.sql.functions import upper

In [None]:
type(df.c) == type(upper(df.c)) == type(df.c.isNull())

In [None]:
df.select(df.c, df.a)

In [None]:
# Create a new column
df.withColumn('upper_c', upper(df.c))

In [None]:
# Selection on rows
df.filter(df.a == 1)

## Apllying a Function

In [None]:
df.printSchema()

## Grouping Data

In [None]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df

In [None]:
df.groupby('color').avg("v1")

In [None]:
from pyspark.sql.functions import sum, avg

In [None]:
df.groupby('color').agg(sum("v2").alias("v2 sum"), avg("v1").alias("v1 avg"))

## Getting Data in/out

In [None]:
df.write.csv('foo.csv', header=True)

In [None]:
spark.read.csv('foo.csv', header=True)

In [None]:
df.write.parquet('bar.parquet')

In [None]:
spark.read.parquet('bar.parquet')

## Working with SQL

In [None]:
df

DataFrame and Spark SQL share the same execution engine so they can be interchangeably used seamlessly. For example, you can register the DataFrame as a table and run a SQL easily as below

In [None]:
df.createOrReplaceTempView("tableA")

In [None]:
spark.sql("SELECT count(*) from tableA")

In [None]:
spark.sql("SELECT * from tableA")

In [None]:
spark.sql("CREATE TABLE tableB AS SELECT * FROM tableA")

In [None]:
spark.sql("SELECT * from tableB")

## Pandas on Spark API

In [None]:
import pyspark.pandas as ps

In [None]:
import numpy as np

In [None]:
# from a spark dataframe
psdf = df.to_pandas_on_spark()
psdf

In [None]:
type(df)

In [None]:
type(psdf)

In [None]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])
psdf

In [None]:
type(psdf)

### Creating a pyspark.pandas Dataframe from a pandas dataframe

In [None]:
dates = pd.date_range('20130101', periods=6)
dates

In [None]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
pdf

In [None]:
type(pdf)

In [None]:
psdf = ps.from_pandas(pdf)
psdf

In [None]:
type(psdf)

In [None]:
psdf.head()

In [None]:
psdf.describe()

## Grouping data in pandas dataframe

In [None]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
psdf

In [None]:
psdf.groupby('A').sum()

## Plot Data

In [None]:
psdf.C.plot()

## Getting data in/out (Unsing Pandas API)

In [None]:
psdf.to_csv('foo2.csv')

In [None]:
ps.read_csv('foo2.csv').head(10)

In [None]:
psdf.to_parquet('bar2.parquet')
ps.read_parquet('bar2.parquet').head(10)

In [None]:
# You can use all the datasources supported py Spark IO
psdf.to_spark_io('zoo.orc', format="orc")
ps.read_spark_io('zoo.orc', format="orc").head(10)