In [0]:
keys = spark.read.csv('/FileStore/tables/dev_user_databricks_accessKeys.csv', inferSchema=True, header=True)
Access_Id = keys.first()[0]
Secret_Id = keys.first()[1]

access_key = Access_Id
secret_key = Secret_Id
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

aws_region = "us-east-1"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

In [0]:
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, TimestampType

schema = StructType([
    StructField('InvoiceNo', IntegerType(), True),
    StructField('StockCode', StringType(), True),
    StructField('Description', StringType(), True),
    StructField('Quantity', IntegerType(), True),
    StructField('InvoiceDate', StringType(), True),
    StructField('UnitPrice', DoubleType(), True),
    StructField('CustomerID', StringType(), True),
    StructField('Country', StringType(), True),
])

In [0]:
df = spark.read.csv('s3://project-bucket-17/Cust_transaction_Data.csv', schema=schema, header=True)
print(df.count(), len(df.columns))
df.show(5)

541909 8
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12-01-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12-01-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
summary = df.describe()
summary.show()

+-------+------------------+------------------+--------------------+------------------+----------------+-----------------+------------------+-----------+
|summary|         InvoiceNo|         StockCode|         Description|          Quantity|     InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+------------------+------------------+--------------------+------------------+----------------+-----------------+------------------+-----------+
|  count|            532618|            541909|              540455|            541909|          541909|           541909|            406829|     541909|
|   mean|  559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|            null|4.611113626089712|15287.690570239585|       null|
| stddev|13428.417280797716|16799.737628427665|                null|218.08115785023435|            null|96.75985306117957|1713.6003033215968|       null|
|    min|            536365|             10002| 4 PURPLE FLOCK D...|        

In [0]:
# Keep only United Kingdom data
uk_data = df.filter(col('Country') == 'United Kingdom')
print(uk_data.count(), len(uk_data.columns))
uk_data.show(5)

495478 8
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12-01-2010 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12-01-2010 08:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+
only showing top 5 rows



In [0]:
# Remove rows with missing CustomerID
uk_data = uk_data.filter(col('CustomerID').isNotNull())
print(uk_data.count(), len(uk_data.columns))

361878 8


In [0]:
# Filter out records with Positive values
uk_data = uk_data.filter(col('Quantity') > 0)
print(uk_data.count(), len(uk_data.columns))

354345 8


In [0]:
from pyspark.sql.functions import to_timestamp, coalesce

format1 = "MM-dd-yyyy HH:mm"
format2 = "MM/dd/yyyy HH:mm"
format3 = "MM/dd/yyyy H:mm"

# Convert to timestamp
df = df.withColumn(
    "date_parsed",
    coalesce(
        to_timestamp(col('InvoiceDate'), format1),
        to_timestamp(col('InvoiceDate'), format2),
        to_timestamp(col('InvoiceDate'), format3)
    )
)

df.show(5)


+---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+
|InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|        date_parsed|
+---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12-01-2010 08:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|    71053| WHITE METAL LANTERN|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84406B|CREAM CUPID HEART...|       8|12-01-2010 08:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029G|KNITTED UNION FLA...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12-01-2010 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|


In [0]:
# Add new column depicting total amount
uk_data = uk_data.withColumn("TotalAmount", col("Quantity") * col("UnitPrice"))

In [0]:
uk_data.count(), len(uk_data.columns)

Out[49]: (354345, 9)

In [0]:
from pyspark.sql.functions import col, count, when
null_counts = uk_data.select([count(when(col(c).isNull(), c)).alias(c) for c in uk_data.columns])
null_counts.show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+-----------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|TotalAmount|
+---------+---------+-----------+--------+-----------+---------+----------+-------+-----------+
|        0|        0|          0|       0|          0|        0|         0|      0|          0|
+---------+---------+-----------+--------+-----------+---------+----------+-------+-----------+



In [0]:
# Creating repartition as 1 to get single file
uk_data = uk_data.repartition(1)

In [0]:
# Exporting the file into s3 bucket as a parquet format
# uk_data.write.mode('overwrite').parquet('s3://project-bucket-17/Output_uk_data')