In [116]:
#install google-cloud-storage
#!pip install google-cloud-storage

In [117]:
#this is to find where spark is installed on my system
import findspark
findspark.init()

In [118]:
#import pyspark
import pyspark
from pyspark.sql import SparkSession

In [119]:
#import gcs storage and pandas 
from google.cloud import storage
import pandas as pd

In [120]:
#import local engine spark builder
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [121]:
#we define the file to download from the bucket in GCS
#https://cloud.google.com/storage/docs/downloading-objects#storage-download-object-python
def download_blob(bucket_name, source_blob_name, destination_file_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(source_blob_name)
    blob.download_to_filename(destination_file_name)
    print(
        "Downloaded storage object {} from bucket {} to local file {}.".format(
            source_blob_name, bucket_name, destination_file_name
        )
    )

In [122]:
#download the file from GCS to a folder in the local machine
bucket_name ="online-retail11401"
source_blob_name = "kaggle/mashlyn/online_retail_II.parquet"
destination_file_name ="/home/deen/Desktop/dphi/project/Notebook/dataset/online_retail.parquet"
download_blob(bucket_name, source_blob_name, destination_file_name)

Downloaded storage object kaggle/mashlyn/online_retail_II.parquet from bucket online-retail11401 to local file /home/deen/Desktop/dphi/project/Notebook/dataset/online_retail.parquet.


In [123]:
#read the dateframe with pandas 
df = pd.read_parquet('dataset/online_retail.parquet')
df = pd.DataFrame(df) 
df

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,Customer ID,Country
0,489434,85048,15CM CHRISTMAS GLASS BALL 20 LIGHTS,12,2009-12-01 07:45:00,6.95,13085.0,United Kingdom
1,489434,79323P,PINK CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
2,489434,79323W,WHITE CHERRY LIGHTS,12,2009-12-01 07:45:00,6.75,13085.0,United Kingdom
3,489434,22041,"RECORD FRAME 7"" SINGLE SIZE",48,2009-12-01 07:45:00,2.10,13085.0,United Kingdom
4,489434,21232,STRAWBERRY CERAMIC TRINKET BOX,24,2009-12-01 07:45:00,1.25,13085.0,United Kingdom
...,...,...,...,...,...,...,...,...
1067366,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,2011-12-09 12:50:00,2.10,12680.0,France
1067367,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,2011-12-09 12:50:00,4.15,12680.0,France
1067368,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,2011-12-09 12:50:00,4.15,12680.0,France
1067369,581587,22138,BAKING SET 9 PIECE RETROSPOT,3,2011-12-09 12:50:00,4.95,12680.0,France


In [124]:
#we transform the pandas Dataframe to a Spark Dataframe
df_spark = spark.createDataFrame(df)

In [125]:
#shows the spark dataframe
df_spark.show()

+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|Customer ID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+-----------+--------------+
| 489434|    85048|15CM CHRISTMAS GL...|      12|2009-12-01 07:45:00| 6.95|    13085.0|United Kingdom|
| 489434|   79323P|  PINK CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|   79323W| WHITE CHERRY LIGHTS|      12|2009-12-01 07:45:00| 6.75|    13085.0|United Kingdom|
| 489434|    22041|RECORD FRAME 7" S...|      48|2009-12-01 07:45:00|  2.1|    13085.0|United Kingdom|
| 489434|    21232|STRAWBERRY CERAMI...|      24|2009-12-01 07:45:00| 1.25|    13085.0|United Kingdom|
| 489434|    22064|PINK DOUGHNUT TRI...|      24|2009-12-01 07:45:00| 1.65|    13085.0|United Kingdom|
| 489434|    21871| SAVE THE PLANET MUG|      24|2009-12-01 07:45:00| 1.2

In [132]:
#import types to shape the schema headings 
from pyspark.sql.types import *
from pyspark.sql import functions as F


In [133]:
#casting each data into their Datatype and we write the output as df_spark
df_spark \
    .withColumnRenamed("Customer ID","CustomerID")\
    .withColumn("CustomerID",F.col("CustomerID").cast(IntegerType()))\
    .withColumn("Invoice",F.col("Invoice").cast(StringType()))\
    .withColumn("StockCode",F.col("StockCode").cast(StringType()))\
    .withColumn("Description",F.col("Description").cast(StringType()))\
    .withColumn("Quantity",F.col("Quantity").cast(IntegerType()))\
    .withColumn("InvoiceDate",F.col("InvoiceDate").cast(TimestampType()))\
    .withColumn("Price",F.col("Price").cast(DoubleType()))\
    .withColumn("Country",F.col("Country").cast(StringType()))\
    .repartition(10) \
    .write.parquet('df_spark', mode='overwrite')



In [134]:
#reading the new_data we wrote (df_spark)
df_spark_retail  = spark.read \
    .option("header", "true") \
    .parquet('df_spark')

In [135]:
df_spark_retail.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+
| 499897|   85230A|OPIUM SCENTED VOT...|      48|2010-03-03 11:53:00| 0.21|     14298|United Kingdom|
| 505453|    21973|SET OF 36 MUSHROO...|      24|2010-04-22 11:33:00| 1.45|     12625|       Germany|
| 501103|   84970S|HANGING HEART ZIN...|      12|2010-03-12 13:07:00| 0.85|     14824|United Kingdom|
| 513750|   47591D|PINK FAIRY CAKE C...|       3|2010-06-28 12:38:00| 1.95|     17019|United Kingdom|
| 509536|    22621|TRADITIONAL KNITT...|      12|2010-05-24 12:21:00| 1.45|     14911|          EIRE|
| 500356|    21775|DECORATIVE FLORE ...|       6|2010-03-07 15:34:00| 1.25|     16984|United Kingdom|
| 503549|    21507|ELEPHANT, BIRTHDA...|       1|2010-04-01 15:04:00| 0.85|       

In [142]:
#a custom defined function to be called on certain rows 
#this returns the products of 2 columns and rounds the answer up to 2dp
def Total_Cost(x,y):
    total = F.round((x * y),2)
    return total

#this returns the absolute value of a number 
def Absolute_Val(x):
    x = F.abs(x)
    return x

In [169]:
#here, we create a new column (Total) which is the product of quantity and price,
#convert the negative values in quantity to positive values, 
#and negative prices to positive prices

df_spark_retail \
    .withColumn("Total", Absolute_Val(Total_Cost(F.col("Quantity"), F.col("Price"))))  \
    .withColumn("Quantity", Absolute_Val(F.col("Quantity"))) \
    .withColumn("Price", Absolute_Val(F.col("Price"))) \
    .repartition(10)\
    .write.parquet('online_retail', mode='overwrite')

In [170]:
df_spark_retail.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|Total|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
| 492079|    85042|ANTIQUE LILY FAIR...|       8|2009-12-15 13:49:00| 4.95|     15070|United Kingdom| 39.6|
| 558813|    22134|MINI LADLE LOVE H...|      12|2011-07-04 11:42:00| 0.42|     12682|        France| 5.04|
| 563155|    21240|   BLUE POLKADOT CUP|       5|2011-08-12 11:48:00| 0.85|     13069|United Kingdom| 4.25|
| 579297|   85099C|JUMBO  BAG BAROQU...|       5|2011-11-29 11:23:00| 4.13|         0|United Kingdom|20.65|
| 581219|    21012|ANTIQUE ALL GLASS...|       2|2011-12-08 09:28:00| 2.46|         0|United Kingdom| 4.92|
| 545719|    22870|NUMBER TILE COTTA...|       8|2011-03-07 10:28:00| 1.95|     13344|United Kingdom| 15.6|
| 523868|    21181|PLEASE ON

In [171]:
#reading the new_data written above(online_retail_cleaned)
df_spark_retail  = spark.read \
    .option("header", "true") \
    .parquet('online_retail')

In [172]:
#Before writing sql on table, we have to make it readable in spark sql
df_spark_retail.registerTempTable('df_spark_retail')

In [173]:
#here we check the total amount of items in the columns which was 1067371
df_spark_retail.count()

1067371

In [174]:
#this SQl was used to view the columns before and after modifications


df_spark_retail = spark.sql("""
SELECT
    *
FROM
    df_spark_retail 
""")

In [175]:
#we see that 1067371 items were returned, which is total
df_spark_retail.count()

1067371

In [176]:
#here we check the qty of invoice that had "c" in them, which indicates goods that were returned 
#according to the meta data
df_spark_retail_C = spark.sql("""
SELECT
    count(1)
FROM
    df_spark_retail 
WHERE
    Invoice LIKE 'C%'

""")

In [177]:
df_spark_retail_C.show()

+--------+
|count(1)|
+--------+
|   19494|
+--------+



In [178]:
#here we check the qty of items after removing the ones that were returned, 
#to see if it adds up to the total
#1047877 + 19494 = 1067371
df_spark_retail.where(~F.col("Invoice").like("C%")).count()

1047877

In [180]:
#df_spark_retail(F.col("Invoice").startsWith("C")).show()
df_spark_retail_Final = df_spark_retail.where(~F.col("Invoice").like("C%"))

In [182]:
df_spark_retail_Final.write.parquet('online_retail_cleaned', mode='overwrite')

In [183]:
#reading the new_data
df_spark_retail_final  = spark.read \
    .option("header", "true") \
    .parquet('online_retail_cleaned')

In [184]:
df_spark_retail_final.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|Total|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
| 568346|    22960|JAM MAKING SET WI...|       5|2011-09-26 15:28:00| 8.29|     14096|United Kingdom|41.45|
| 518795|    21535|RED RETROSPOT SMA...|       6|2010-08-12 10:03:00| 2.55|     14276|United Kingdom| 15.3|
| 548863|    21389|IVORY HANGING DEC...|      24|2011-04-04 13:57:00| 0.85|     15311|United Kingdom| 20.4|
| 553011|    20724|RED RETROSPOT CHA...|      10|2011-05-12 17:03:00| 0.85|     14390|United Kingdom|  8.5|
| 557937|    22059|CERAMIC STRAWBERR...|       2|2011-06-23 15:30:00| 3.29|         0|United Kingdom| 6.58|
| 525071|    21212|PACK OF 72 RETROS...|      24|2010-10-03 14:50:00| 0.55|     12583|        France| 13.2|
| 540681|   85180A|RED HEART

In [185]:
df_spark_retail_final.where(~F.col("Invoice").like("C%")).count()

1047877

In [186]:
df = pd.read_parquet('online_retail_cleaned')
df = pd.DataFrame(df) 
df

Unnamed: 0,Invoice,StockCode,Description,Quantity,InvoiceDate,Price,CustomerID,Country,Total
0,568346,22960,JAM MAKING SET WITH JARS,5,2011-09-26 13:28:00,8.29,14096,United Kingdom,41.45
1,518795,21535,RED RETROSPOT SMALL MILK JUG,6,2010-08-12 08:03:00,2.55,14276,United Kingdom,15.30
2,548863,21389,IVORY HANGING DECORATION BIRD,24,2011-04-04 11:57:00,0.85,15311,United Kingdom,20.40
3,553011,20724,RED RETROSPOT CHARLOTTE BAG,10,2011-05-12 15:03:00,0.85,14390,United Kingdom,8.50
4,557937,22059,CERAMIC STRAWBERRY DESIGN MUG,2,2011-06-23 13:30:00,3.29,0,United Kingdom,6.58
...,...,...,...,...,...,...,...,...,...
1047872,525879,22382,LUNCH BAG SPACEBOY DESIGN,10,2010-10-07 12:24:00,1.65,15228,United Kingdom,16.50
1047873,549568,21080,SET/20 RED RETROSPOT PAPER NAPKINS,12,2011-04-10 13:09:00,0.85,15665,United Kingdom,10.20
1047874,511259,22503,CABIN BAG VINTAGE PAISLEY,1,2010-06-07 10:42:00,29.95,16382,United Kingdom,29.95
1047875,556244,22467,GUMBALL COAT RACK,2,2011-06-09 14:08:00,2.55,15867,United Kingdom,5.10


In [187]:
#reading the new_data
df_spark_retail  = spark.read \
    .option("header", "true") \
    .parquet('online_retail_cleaned')

In [188]:
df_spark_retail.show()

+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
|Invoice|StockCode|         Description|Quantity|        InvoiceDate|Price|CustomerID|       Country|Total|
+-------+---------+--------------------+--------+-------------------+-----+----------+--------------+-----+
| 568346|    22960|JAM MAKING SET WI...|       5|2011-09-26 15:28:00| 8.29|     14096|United Kingdom|41.45|
| 518795|    21535|RED RETROSPOT SMA...|       6|2010-08-12 10:03:00| 2.55|     14276|United Kingdom| 15.3|
| 548863|    21389|IVORY HANGING DEC...|      24|2011-04-04 13:57:00| 0.85|     15311|United Kingdom| 20.4|
| 553011|    20724|RED RETROSPOT CHA...|      10|2011-05-12 17:03:00| 0.85|     14390|United Kingdom|  8.5|
| 557937|    22059|CERAMIC STRAWBERR...|       2|2011-06-23 15:30:00| 3.29|         0|United Kingdom| 6.58|
| 525071|    21212|PACK OF 72 RETROS...|      24|2010-10-03 14:50:00| 0.55|     12583|        France| 13.2|
| 540681|   85180A|RED HEART

In [1]:
#upload to bucket
!gsutil -m cp -r online_retail_cleaned/ gs://online-retail11401/kaggle_cleaned/online_retail_cleaned

Copying file://online_retail_cleaned/.part-00001-d3c29033-f07f-48f9-8dc5-58eeb8bb0104-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/part-00002-d3c29033-f07f-48f9-8dc5-58eeb8bb0104-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/._SUCCESS.crc [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/part-00003-d3c29033-f07f-48f9-8dc5-58eeb8bb0104-c000.snappy.parquet [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/_SUCCESS [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/.part-00002-d3c29033-f07f-48f9-8dc5-58eeb8bb0104-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/.part-00003-d3c29033-f07f-48f9-8dc5-58eeb8bb0104-c000.snappy.parquet.crc [Content-Type=application/octet-stream]...
Copying file://online_retail_cleaned/part-00000-d3c29033-f07f-

In [400]:
!ls


dataset  df_retail  df_spark  online_retail_cleaned  online-retail.ipynb
