In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder\
    .master("local[*]")\
    .appName("Superstore Analysis")\
    .getOrCreate()

In [3]:
from pyspark.sql.types import *

In [4]:
"""
__all__ = [
    "DataType", "NullType", "StringType", "BinaryType", "BooleanType", "DateType",
    "TimestampType", "DecimalType", "DoubleType", "FloatType", "ByteType", "IntegerType",
    "LongType", "ShortType", "ArrayType", "MapType", "StructField", "StructType"]
"""
#create df with custom schema
schema = StructType([
	StructField("row_id", StringType(), True),
	StructField("order_id", StringType(), True),
	StructField("order_date", StringType(), True),
	StructField("ship_date", StringType(), True),
	StructField("ship_mode", StringType(), True),
	StructField("customer_id", StringType(), True),
	StructField("customer_name", StringType(), True),
	StructField("segment", StringType(), True),
	StructField("country", StringType(), True),
	StructField("city", StringType(), True),
	StructField("state", StringType(), True),
	StructField("postal_code", StringType(), True),
	StructField("region", StringType(), True),
	StructField("product_id", StringType(), True),
	StructField("category", StringType(), True),
	StructField("sub_category", StringType(), True),
	StructField("product_name", StringType(), True),
	StructField("sales", DoubleType(), True),
	StructField("quantity", IntegerType(), True),
	StructField("discount", DoubleType(), True),
	StructField("profit", DoubleType(), True)
])

#create df with schema define
superstore_df = spark.read.csv("D:/Work/training/sample_training/Superstores.csv", sep="|", schema=schema, header=None)

In [5]:
#https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=read%20csv

#create df with automatically define schema
superstore_df = spark.read.csv("D:/Work/training/sample_training/Superstores.csv", sep="|", inferSchema=True, header=True)

In [6]:
superstore_df.dtypes

[('Row ID', 'int'),
 ('Order ID', 'string'),
 ('Order Date', 'string'),
 ('Ship Date', 'string'),
 ('Ship Mode', 'string'),
 ('Customer ID', 'string'),
 ('Customer Name', 'string'),
 ('Segment', 'string'),
 ('Country', 'string'),
 ('City', 'string'),
 ('State', 'string'),
 ('Postal Code', 'int'),
 ('Region', 'string'),
 ('Product ID', 'string'),
 ('Category', 'string'),
 ('Sub-Category', 'string'),
 ('Product Name', 'string'),
 ('Sales', 'double'),
 ('Quantity', 'int'),
 ('Discount', 'double'),
 ('Profit', 'double')]

In [7]:
#Select dataframe column
superstore_df.select("Order Date").show()

+----------+
|Order Date|
+----------+
| 11/8/2016|
| 11/8/2016|
| 6/12/2016|
|10/11/2015|
|10/11/2015|
|  6/9/2014|
|  6/9/2014|
|  6/9/2014|
|  6/9/2014|
|  6/9/2014|
|  6/9/2014|
|  6/9/2014|
| 4/15/2017|
| 12/5/2016|
|11/22/2015|
|11/22/2015|
|11/11/2014|
| 5/13/2014|
| 8/27/2014|
| 8/27/2014|
+----------+
only showing top 20 rows



In [8]:
import datetime

#python cast to date
def castToDate(dates):
    new_dates = datetime.datetime.strptime(dates, '%d/%m/%Y').date()
    return new_dates

#to convert string use .strftime('%Y-%m-%d')

In [9]:
from pyspark.sql.functions import lit, to_date, year, month, dayofmonth

#Convert string dataframe to date dataframe with specific format
temp_df = superstore_df\
    .withColumn('Order Date', lit(to_date(superstore_df['Order Date'], 'mm/dd/yyyy')))\
    .withColumn('Ship Date', lit(to_date(superstore_df['Ship Date'], 'mm/dd/yyyy')))

In [10]:
temp_df.select("Order Date").head(3)

[Row(Order Date=datetime.date(2016, 1, 8)),
 Row(Order Date=datetime.date(2016, 1, 8)),
 Row(Order Date=datetime.date(2016, 1, 12))]

In [18]:
import pyspark.sql.functions as F

#function to Concat column in dataframe
def myConcat(*cols):
    return F.concat(*[F.coalesce(c, F.lit("*")) for c in cols])

#Or you can use:
#new_df = temp_df.withColumn('Period', myConcat.concat(year("Order Date"), month("Order Date"))).show()
#new_df = temp_df.withColumn('Period', F.concat(year("Order Date"), month("Order Date"))).show()

In [38]:
"""
dd = (dd.withColumn('month', F.when(F.length(F.col('month')) == 1, F.concat(F.lit('0'), F.col('month'))).otherwise(F.col('month')))
        .withColumn('date', F.when(F.length(F.col('date')) == 1, F.concat(F.lit('0'), F.col('date'))).otherwise(F.col('date')))
        .withColumn('hhmm', F.when(F.length(F.col('hhmm')) == 1, F.concat(F.lit('000'), F.col('hhmm')))
                             .when(F.length(F.col('hhmm')) == 2, F.concat(F.lit('00'), F.col('hhmm')))
                             .when(F.length(F.col('hhmm')) == 3, F.concat(F.lit('0'), F.col('hhmm')))
                             .otherwise(F.col('hhmm')))
        .withColumn('time', F.to_timestamp(F.concat(*dd.columns), format='yyyyMMddHHmm'))
     )
"""
#F.when(F.length(F.col(temp_df.select(month('Order Date')))) == 1, F.concat(F.lit('0'), F.col(temp_df.select(month('Order Date'))))).otherwise(F.col(temp_df.select(month("Order Date"))))

new_df = temp_df.withColumn('Period', F.concat(year("Order Date"), month("Order Date")))

In [40]:
new_df.head(1)

[Row(Row ID=1, Order ID='CA-2016-152156', Order Date=datetime.date(2016, 1, 8), Ship Date=datetime.date(2016, 1, 11), Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code=42420, Region='South', Product ID='FUR-BO-10001798', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Somerset Collection Bookcase', Sales=261.96, Quantity=2, Discount=0.0, Profit=41.9136, Period='20161')]

In [67]:
new_df.groupBy("Period").agg({"Sales":"sum"}).alias("sum_sales").orderBy("sum_sales.sum(Sales)").show()   

+------+------------------+
|Period|        sum(Sales)|
+------+------------------+
| 20151|470532.50899999985|
| 20141| 484247.4981000009|
| 20161| 609205.5980000008|
| 20171| 733215.2551999999|
+------+------------------+

