In [None]:
from glob import glob
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import pandas as pd

In [None]:
# Create a session
spark = SparkSession.builder.appName("PySpark Introduction").getOrCreate()

In [None]:
CSV_PATH = './data/datacamp_ecommerce.csv'

In [None]:
df = spark.read.csv(CSV_PATH)
df

In [None]:
df.show()

In [None]:
df = spark.read.csv(CSV_PATH, header=True)
df

In [None]:
df.show()

In [None]:
schema = StructType([
    StructField("InvoiceNo", StringType(), True),
    StructField("StockCode", StringType(), True),
    StructField("Description", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("InvoiceDate", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("CustomerID", StringType(), True),
    StructField("Country", StringType(), True)
])

df = spark.read.csv(CSV_PATH, header=True, schema=schema)
df

In [None]:
df.show()

In [None]:
df.count()

## 1. How many unique customers are present in the dataframe?

In [None]:
df.select('CustomerID').distinct().count()

## 2. What country do most purchases come from?

In [None]:
from pyspark.sql.functions import countDistinct

df.groupBy('Country').agg(countDistinct('InvoiceNo').alias('country_count')).show()

In [None]:
from pyspark.sql.functions import desc

# with sort
df.groupBy('Country').agg(countDistinct('InvoiceNo').alias('country_count')).orderBy(desc('country_count')).show()

## 3. When was the most recent/most early purchase made by a customer on the platform?

In [None]:
df.dtypes

In [None]:
from pyspark.sql.functions import to_timestamp

spark.sql('set spark.sql.legacy.timeParserPolicy=LEGACY')
df = df.withColumn('InvoiceDate', to_timestamp('InvoiceDate', 'MM/dd/yyyy HH:mm'))
df.show()

In [None]:
df.dtypes

In [None]:
from pyspark.sql.functions import min as sql_func_min, max as sql_func_max

df.select(sql_func_min("InvoiceDate")).show()

In [None]:
df.select(sql_func_max("InvoiceDate")).show()

## 4. What was the highest/lowest purchase made by a customer on the platform?

In [None]:
df.show()

In [None]:
df = df.withColumn('TotalPrice', df.Quantity * df.UnitPrice)
df.show()

In [None]:
df.dtypes

In [None]:
from pyspark.sql.functions import sum as sql_func_sum, asc

df.groupby('InvoiceNo') \
    .agg(sql_func_sum(df.Quantity * df.UnitPrice).alias('InvoiceTotalPrice')) \
    .orderBy(asc('InvoiceTotalPrice')).show()

In [None]:
df.groupby('InvoiceNo') \
    .agg(sql_func_sum('TotalPrice').alias('InvoiceTotalPrice')) \
    .orderBy(desc('InvoiceTotalPrice')).show()

## 5. Other syntax

### 5.1. SQL

In [None]:
df.createOrReplaceTempView("CUSTOMER_DATA")
sql_df = spark.sql("SELECT * from CUSTOMER_DATA")
sql_df.show()

In [None]:
sql_df = spark.sql(
    '''
    SELECT InvoiceDate, CustomerID, TotalPrice
    FROM CUSTOMER_DATA
    WHERE CustomerID == 13047
    '''
)
sql_df.show()

### 5.2. Parquet file

In [None]:
!ls ./data/dummy_parquet_dataset/part-00000.parquet

In [None]:
PARQUET_PATHS = glob('./data/dummy_parquet_dataset/*.parquet')
PARQUET_PATHS

In [None]:
par_df = spark.read.parquet(PARQUET_PATHS[0])
par_df.show()

In [None]:
par_df.count()

#### PySpark vs Pandas

1. Read and concat 2 parquet files
2. Drop `URL` column
3. Filter `similarity` value higher than 0.4
4. Create new `image_size` column by `HEIGHT * WIDTH`

In [None]:
spark_df_list = [spark.read.parquet(p) for p in PARQUET_PATHS]
spark_df_list[0].count()

In [None]:
from pyspark.sql import DataFrame
from functools import reduce

In [None]:
spark_df = reduce(DataFrame.unionAll, spark_df_list)

In [None]:
spark_df = spark_df.drop('URL')
spark_df.show()

In [None]:
spark_df = spark_df.filter(spark_df.similarity > 0.4)
spark_df.show()

In [None]:
spark_df.count()

In [None]:
spark_df = spark_df.withColumn('image_size', spark_df.HEIGHT * spark_df.WIDTH)
spark_df.show()

In [None]:
from pyspark.sql.functions import col

In [None]:
%%timeit

spark_df = reduce(DataFrame.unionAll, [spark.read.parquet(p) for p in PARQUET_PATHS]) \
    .drop('URL') \
    .filter(col('similarity') > 0.4) \
    .withColumn('image_size', col('HEIGHT') * col('WIDTH'))
spark_df.show()

In [None]:
spark_df.count()

``` bash
pip install pyarrow
```
to handle parquet file

In [None]:
# pip install pyarrow

In [None]:
import pandas as pd
from glob import glob

In [None]:
%%timeit

df = pd.concat([
    pd.read_parquet(path) for path in PARQUET_PATHS
])
df = df.drop(columns=['URL'])
df = df.loc[df.similarity > 0.4]
df['image_size'] = df.HEIGHT * df.WIDTH

In [None]:
df

In [None]:
spark_df.write.parquet('output_1_pyspark')

In [None]:
spark_df.coalesce(1).write.parquet('output_2_pyspark')