In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import *

In [3]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

In [4]:
df = spark.read.csv('OnlineRetail.csv' , header=True , inferSchema=True)

In [5]:
df.show(10)

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|   17850.0|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [6]:
df.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [7]:
df.describe().show()

+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|           541909|            541909|              540455|            541909|         541909|           541909|            406829|     541909|
|   mean| 559965.752026781|27623.240210938104|             20713.0|  9.55224954743324|           null| 4.61111362608837|15287.690570239585|       null|
| stddev|13428.41728079708|16799.737628427687|                 NaN|218.08115785023466|           null|96.75985306117981|1713.6003033215923|       null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|            -80995|1/

In [8]:
# ^C = Cancel Product

df.where(
    df['InvoiceNo'].rlike('^C')
).show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|  C536379|        D|            Discount|      -1|1/12/2018 09:41|     27.5|   14527.0|United Kingdom|
|  C536383|   35004C|SET OF 3 COLOURED...|      -1|1/12/2018 09:49|     4.65|   15311.0|United Kingdom|
|  C536391|    22556|PLASTERS IN TIN C...|     -12|1/12/2018 10:24|     1.65|   17548.0|United Kingdom|
|  C536391|    21984|PACK OF 12 PINK P...|     -24|1/12/2018 10:24|     0.29|   17548.0|United Kingdom|
|  C536391|    21983|PACK OF 12 BLUE P...|     -24|1/12/2018 10:24|     0.29|   17548.0|United Kingdom|
|  C536391|    21980|PACK OF 12 RED RE...|     -24|1/12/2018 10:24|     0.29|   17548.0|United Kingdom|
|  C536391|    21484|CHICK GREY HOT WA...|     -12|1/12/2018 10:

In [9]:
df2 = df.where(
    ~ df['InvoiceNo'].rlike('^C')
)

In [10]:
df2.where(
    df2['InvoiceNo'].rlike('^C')
).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [11]:
# Quantity < 0 

df2.where(
    df2['Quantity'] < 0
).show()

+---------+---------+-----------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-----------+--------+---------------+---------+----------+--------------+
|   536589|    21777|       null|     -10|1/12/2018 16:50|      0.0|      null|United Kingdom|
|   536764|   84952C|       null|     -38|2/12/2018 14:42|      0.0|      null|United Kingdom|
|   536996|    22712|       null|     -20|3/12/2018 15:30|      0.0|      null|United Kingdom|
|   536997|    22028|       null|     -20|3/12/2018 15:30|      0.0|      null|United Kingdom|
|   536998|    85067|       null|      -6|3/12/2018 15:30|      0.0|      null|United Kingdom|
|   537000|    21414|       null|     -22|3/12/2018 15:32|      0.0|      null|United Kingdom|
|   537001|    21653|       null|      -6|3/12/2018 15:33|      0.0|      null|United Kingdom|
|   537003|    85126|       null|      -2|3/12/201

In [12]:
df3 = df2.where(
    df2['Quantity'] > 0
)

In [13]:
# UnitPrice < 0

df3.where(
    df3['UnitPrice'] < 0
).show()

+---------+---------+---------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|    Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+---------------+--------+---------------+---------+----------+--------------+
|  A563186|        B|Adjust bad debt|       1|12/8/2019 14:51|-11062.06|      null|United Kingdom|
|  A563187|        B|Adjust bad debt|       1|12/8/2019 14:52|-11062.06|      null|United Kingdom|
+---------+---------+---------------+--------+---------------+---------+----------+--------------+



In [14]:
df4 = df3.where(
    df3['UnitPrice'] > 0
)

In [15]:
df4.describe().show()

+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|           530104|            530104|              530104|            530104|         530104|           530104|            397884|     530104|
|   mean|559981.4746888812|27591.351654656588|                null|10.542037034242338|           null|3.907625247122546|15294.423452564064|       null|
| stddev|13430.04973766428|16756.848658106624|                null|155.52412351063666|           null|35.91568110425539|1713.1415604398564|       null|
|    min|           536365|             10002| 4 PURPLE FLOCK D...|                 1|1/

In [16]:
# StockCode only digits

df4.where(
    df4['StockCode'].rlike('^[\D]')
).show()

+---------+------------+--------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|   StockCode|   Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+------------+--------------+--------+---------------+---------+----------+--------------+
|   536370|        POST|       POSTAGE|       3|1/12/2018 08:45|     18.0|   12583.0|        France|
|   536403|        POST|       POSTAGE|       1|1/12/2018 11:27|     15.0|   12791.0|   Netherlands|
|   536527|        POST|       POSTAGE|       1|1/12/2018 13:04|     18.0|   12662.0|       Germany|
|   536540|          C2|      CARRIAGE|       1|1/12/2018 14:05|     50.0|   14911.0|          EIRE|
|   536544|         DOT|DOTCOM POSTAGE|       1|1/12/2018 14:32|   569.77|      null|United Kingdom|
|   536569|           M|        Manual|       1|1/12/2018 15:35|     1.25|   16274.0|United Kingdom|
|   536569|           M|        Manual|       1|1/12/2018 15:35|    18.95|   16274.0|United

In [17]:
df5 = df4.withColumn(
    'StockCode' , regexp_replace('StockCode' , r'\D' , '')
)

In [18]:
df5.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|    85123|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84406|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|   17850.0|United Kingdom|
|   536365|    84029|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84029|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [19]:
df5.where(
    df5['StockCode'].rlike('^[\D]')
).show()

+---------+---------+-----------+--------+-----------+---------+----------+-------+
|InvoiceNo|StockCode|Description|Quantity|InvoiceDate|UnitPrice|CustomerID|Country|
+---------+---------+-----------+--------+-----------+---------+----------+-------+
+---------+---------+-----------+--------+-----------+---------+----------+-------+



In [20]:
df5.describe().show()

+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|           530104|            530104|              530104|            530104|         530104|           530104|            397884|     530104|
|   mean|559981.4746888812|31469.135391528038|                null|10.542037034242338|           null|3.907625247122546|15294.423452564064|       null|
| stddev|13430.04973766428| 21457.07875925214|                null|155.52412351063666|           null|35.91568110425539|1713.1415604398564|       null|
|    min|           536365|                  | 4 PURPLE FLOCK D...|                 1|1/

In [21]:
df5.orderBy(asc('StockCode')).show()

+---------+---------+--------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|   Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------+--------+---------------+---------+----------+--------------+
|   575138|         |DOTCOM POSTAGE|       1|8/11/2019 15:24|  1483.21|      null|United Kingdom|
|   575144|         |       POSTAGE|       3|8/11/2019 15:38|     28.0|   12540.0|         Spain|
|   574741|         |        Manual|       6|6/11/2019 16:13|     1.45|   15993.0|United Kingdom|
|   575151|         |       POSTAGE|       1|8/11/2019 16:12|     15.0|   12395.0|       Belgium|
|   574844|         |       POSTAGE|       1|7/11/2019 11:57|     28.0|   12783.0|      Portugal|
|   575154|         |       POSTAGE|       2|8/11/2019 16:15|     18.0|   12395.0|       Belgium|
|   574862|         |       POSTAGE|       5|7/11/2019 12:27|     15.0|   12408.0|       Belgium|
|   574838|         

In [22]:
# StockCode 5 first Description letters

df6 = df5.withColumn(
    'StockCode' , when(df5['StockCode']=='' , df5['Description'].substr(1,5)).otherwise(df5['StockCode'])
)

In [23]:
df6.orderBy(desc('StockCode')).show()

+---------+---------+-----------+--------+----------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+-----------+--------+----------------+---------+----------+--------------+
|   572849|    SAMPL|    SAMPLES|       1|26/10/2019 12:20|    33.05|      null|United Kingdom|
|   549684|    SAMPL|    SAMPLES|       1| 11/4/2019 13:24|     30.0|      null|United Kingdom|
|   575901|    POSTA|    POSTAGE|       1|11/11/2019 15:19|     1.69|      null|United Kingdom|
|   575498|    POSTA|    POSTAGE|       1|10/11/2019 09:52|     18.0|   12708.0|       Germany|
|   575581|    POSTA|    POSTAGE|       1|10/11/2019 11:53|     18.0|   12659.0|        France|
|   574885|    POSTA|    POSTAGE|       1| 7/11/2019 14:01|     18.0|   12472.0|       Germany|
|   575584|    POSTA|    POSTAGE|       1|10/11/2019 11:55|     18.0|   12682.0|        France|
|   574838|    POSTA|    POSTAGE|       

In [24]:
# CustomerID is null

df6.where(
    df6['CustomerID'].isNull()
).show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536544|    21773|DECORATIVE ROSE B...|       1|1/12/2018 14:32|     2.51|      null|United Kingdom|
|   536544|    21774|DECORATIVE CATS B...|       2|1/12/2018 14:32|     2.51|      null|United Kingdom|
|   536544|    21786|  POLKADOT RAIN HAT |       4|1/12/2018 14:32|     0.85|      null|United Kingdom|
|   536544|    21787|RAIN PONCHO RETRO...|       2|1/12/2018 14:32|     1.66|      null|United Kingdom|
|   536544|    21790|  VINTAGE SNAP CARDS|       9|1/12/2018 14:32|     1.66|      null|United Kingdom|
|   536544|    21791|VINTAGE HEADS AND...|       2|1/12/2018 14:32|     2.51|      null|United Kingdom|
|   536544|    21801|CHRISTMAS TREE DE...|      10|1/12/2018 14:

In [25]:
df7 = df6.withColumn(
    'CustomerID' , when(df6['CustomerID'].isNull() , 99999).otherwise(df6['CustomerID'])
)

In [26]:
df7.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|    85123|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84406|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|   17850.0|United Kingdom|
|   536365|    84029|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84029|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [27]:
df7.describe().show()

+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|summary|        InvoiceNo|         StockCode|         Description|          Quantity|    InvoiceDate|        UnitPrice|        CustomerID|    Country|
+-------+-----------------+------------------+--------------------+------------------+---------------+-----------------+------------------+-----------+
|  count|           530104|            530104|              530104|            530104|         530104|           530104|            530104|     530104|
|   mean|559981.4746888812|31469.135391528038|                null|10.542037034242338|           null|3.907625247122546|36421.672277515354|       null|
| stddev|13430.04973766428| 21457.07875925214|                null|155.52412351063666|           null|35.91568110425539|  36679.9589632566|       null|
|    min|           536365|            000110| 4 PURPLE FLOCK D...|                 1|1/

In [28]:
df7.printSchema()

root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: double (nullable = true)
 |-- Country: string (nullable = true)



In [29]:
df7.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|    85123|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|   17850.0|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84406|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|   17850.0|United Kingdom|
|   536365|    84029|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    84029|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|   17850.0|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|   17850.0|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [30]:
# CustomerID to int

df8 = df7.withColumn(
    'CustomerID' , df7['CustomerID'].cast('int')
)

In [31]:
# CustomerID to str

df9 = df8.withColumn(
    'CustomerID' , df8['CustomerID'].cast('string')
)

In [32]:
df9.show()

+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|    InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+---------------+---------+----------+--------------+
|   536365|    85123|WHITE HANGING HEA...|       6|1/12/2018 08:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6|1/12/2018 08:26|     3.39|     17850|United Kingdom|
|   536365|    84406|CREAM CUPID HEART...|       8|1/12/2018 08:26|     2.75|     17850|United Kingdom|
|   536365|    84029|KNITTED UNION FLA...|       6|1/12/2018 08:26|     3.39|     17850|United Kingdom|
|   536365|    84029|RED WOOLLY HOTTIE...|       6|1/12/2018 08:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|1/12/2018 08:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|1/12/2018 08:

In [33]:
import pandas as pd

In [34]:
online_retail = df9.toPandas()

In [35]:
online_retail

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123,WHITE HANGING HEART T-LIGHT HOLDER,6,1/12/2018 08:26,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,1/12/2018 08:26,3.39,17850,United Kingdom
2,536365,84406,CREAM CUPID HEARTS COAT HANGER,8,1/12/2018 08:26,2.75,17850,United Kingdom
3,536365,84029,KNITTED UNION FLAG HOT WATER BOTTLE,6,1/12/2018 08:26,3.39,17850,United Kingdom
4,536365,84029,RED WOOLLY HOTTIE WHITE HEART.,6,1/12/2018 08:26,3.39,17850,United Kingdom
...,...,...,...,...,...,...,...,...
530099,581587,22613,PACK OF 20 SPACEBOY NAPKINS,12,9/12/2019 12:50,0.85,12680,France
530100,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,9/12/2019 12:50,2.10,12680,France
530101,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,9/12/2019 12:50,4.15,12680,France
530102,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,9/12/2019 12:50,4.15,12680,France


In [36]:
online_retail.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 530104 entries, 0 to 530103
Data columns (total 8 columns):
 #   Column       Non-Null Count   Dtype  
---  ------       --------------   -----  
 0   InvoiceNo    530104 non-null  object 
 1   StockCode    530104 non-null  object 
 2   Description  530104 non-null  object 
 3   Quantity     530104 non-null  int32  
 4   InvoiceDate  530104 non-null  object 
 5   UnitPrice    530104 non-null  float64
 6   CustomerID   530104 non-null  object 
 7   Country      530104 non-null  object 
dtypes: float64(1), int32(1), object(6)
memory usage: 30.3+ MB


In [37]:
# Converst InvoiceDate to datetime

online_retail['InvoiceDate'] = pd.to_datetime(online_retail['InvoiceDate'])

In [38]:
online_retail

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123,WHITE HANGING HEART T-LIGHT HOLDER,6,2018-01-12 08:26:00,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2018-01-12 08:26:00,3.39,17850,United Kingdom
2,536365,84406,CREAM CUPID HEARTS COAT HANGER,8,2018-01-12 08:26:00,2.75,17850,United Kingdom
3,536365,84029,KNITTED UNION FLAG HOT WATER BOTTLE,6,2018-01-12 08:26:00,3.39,17850,United Kingdom
4,536365,84029,RED WOOLLY HOTTIE WHITE HEART.,6,2018-01-12 08:26:00,3.39,17850,United Kingdom
...,...,...,...,...,...,...,...,...
530099,581587,22613,PACK OF 20 SPACEBOY NAPKINS,12,2019-09-12 12:50:00,0.85,12680,France
530100,581587,22899,CHILDREN'S APRON DOLLY GIRL,6,2019-09-12 12:50:00,2.10,12680,France
530101,581587,23254,CHILDRENS CUTLERY DOLLY GIRL,4,2019-09-12 12:50:00,4.15,12680,France
530102,581587,23255,CHILDRENS CUTLERY CIRCUS PARADE,4,2019-09-12 12:50:00,4.15,12680,France


In [39]:
from sqlalchemy import create_engine

In [40]:
my_database = 'mysql+mysqldb://'+'beachratchata'+':'+'123456789'+'@'+'db4free.net'+':3306/'+'beachdata'+'?charset=utf8mb4'
engine = create_engine(my_database)

In [41]:
# Import Dataframe into database

online_retail.to_sql('online_retail' , engine , if_exists='replace' , index=False)