# Test Spark sql
This library is similar to pandas, which can be used to handle the dataframe data and connecting with database.

In [36]:
from pyspark.sql import SparkSession
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

spark = SparkSession.builder.getOrCreate()

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df.show()


+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [13]:
# take col
df.select('a')
# If turn the conf into true, then you dont need to use show method
#df['a'].show() # This will give you a warning
# This is a column instance

# take row
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

In [16]:
# Using `column` instance to select a dataframe from dataframe
# These 2 ways are the same
df.select(df.c)
df.select('c')


c
string1
string2
string3


In [18]:
# Add new col
# If need more funciton, we can use functions (this package)
from pyspark.sql.functions import upper
df.withColumn('upper_c', upper(df.c))

a,b,c,d,e,upper_c
1,2.0,string1,2000-01-01,2000-01-01 12:00:00,STRING1
2,3.0,string2,2000-02-01,2000-01-02 12:00:00,STRING2
3,4.0,string3,2000-03-01,2000-01-03 12:00:00,STRING3


In [25]:
# Using filter to filter the data rather directly use 
df.filter(df.a == 1)
#df.select(df.select('a') == 1)
# No method 

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00


## Apply native python functions

In [39]:
import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    # Simply plus one by using pandas Series.
    return series + 1
type(pandas_plus_one(df.a))
type(df.a)
# df.a is a column type which is the same as seires?
# Why this method return a pd.series but is a column type?


# 没看懂！
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()


+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



## Group Data

In [32]:
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df
# 在我这个spark对象上创建的所有dataframe 都可以直接显示，拥有相同配置

color,fruit,v1,v2
red,banana,1,10
blue,banana,2,20
red,carrot,3,30
blue,grape,4,40
red,carrot,5,50
black,carrot,6,60
red,banana,7,70
red,grape,8,80


In [34]:
df.groupBy('color').avg()


## Using functions in pandas
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

                                                                                

color,avg(v1),avg(v2)
red,4.8,48.0
blue,3.0,30.0
black,6.0,60.0


## Get data in/out

In [40]:
df.write.csv('foo.csv', header=True)
spark.read.csv('foo.csv', header=True).show()

                                                                                

+---+---+-------+----------+--------------------+
|  a|  b|      c|         d|                   e|
+---+---+-------+----------+--------------------+
|  3|4.0|string3|2000-03-01|2000-01-03T12:00:...|
|  2|3.0|string2|2000-02-01|2000-01-02T12:00:...|
|  1|2.0|string1|2000-01-01|2000-01-01T12:00:...|
+---+---+-------+----------+--------------------+

