In [None]:
import pyspark.sql.functions as F
import pyspark as ps
from datetime import date

In [None]:
# get data

df = spark.sql("""
SELECT *
FROM customer_t
LIMIT 100
""")

check columns and schema

In [None]:
df.columns

In [None]:
df.printSchema()

show description table

In [None]:
# for full table
df.describe().show()

In [None]:
# for one column
df.describe('customer_id').show()

display result

In [None]:
# show first 20 rows
df.select('customer_id').distinct().show(20, truncate=False)

In [None]:
# show first row vertically
df.show(1,vertical=True)

transfer to pandas dataframe

In [None]:
import pandas as pd
pd.set_option('display.max_rows', None)
pd.set_option('display.max_columns', None)

df1 = df.toPandas()
print(df1)

filter

In [None]:
df.filter('customer_status=1')

In [None]:
df.filter(F.col('customer_status')==1)

In [None]:
df.filter(df.customer_status==1)

In [None]:
# multiple criteria
de.filter((df['customer_status'].isNotNull()) & ~(df['deceased_customer_flag]))

rename columns

In [None]:
df2 = df.withColumnRenamed('customer_status', 'customer_status_renamed')

Aggregation

In [None]:
df.groupby('customer_status').agg(F.countDistinct('customer_id').alias('count'))

In [None]:
# aggregate on multiple columns
df.groupby('customer_status').agg({'customer_id':'count', 'customer_id':'max'})

In [None]:
# group by multiple columns
df.groupby('customer_start_datetime', 'customer_status').agg(F.countDistinct('customer_id').alias('count'))

join tables

In [None]:
df3 = df.join(df2, on=['customer_id'], how='inner')

create new column

In [None]:
df2 = df.withColumn('new_column', df['customer_id]**2)

In [None]:
df2 = df.withColumn('new_column', F.when(df.customer_id=1,1).otherwise(0))

sort data

In [None]:
df.orderby('customer_id')

In [None]:
df.orderby(df['customer_id'].desc())

In [None]:
df.sort(F.asc('customer_id'))

handle missing data

In [None]:
df.na.drop(thresh=2)

In [None]:
# fill missing data for specific column
df.na.fill(0, subset=['customer_id'])

create pivot table / crosstab

In [None]:
df.crosstab('customer_status','residence_country_code')

In [None]:
df.groupby('customer_status').pivot('residence_country_code').agg(F.countDistinct('customer_id').alias('count')).fillna(0)

handle duplicates

In [None]:
df.dropDuplicates()

create sample dataframe

In [None]:
df_sample = df.sample(False, 0.2, 42)

# inputs:
# withReplacement: select an observation with or without replacement
# fraction: select x percentage data for the sample
# seed: seed for reproduce the result

UDF (user defined function)

In [None]:
# create a python function
def lowerCase(str):
    resStr = str.lower()
    return resStr

# convert a python function to pyspark udf
convertUDF = F.udf(lambda x: lowerCase(x))

# use UDF in dataframe
df.convertUDF(F.col('customer_id')).alias('converted_id')

Pearson correlation

In [None]:
df.select(corr('residence_country_code', 'customer_status'))

save temporary table

In [None]:
table_name = 'tem_table'
drop_table = f"DROP TABLE IF EXISTS {table_name}"
sqlContext.sql(drop_table)

df.write.mode('overwrite').saveAsTable(table_name)