# Data Engineer

### Importing Libraries

In [16]:
import pandas as pd
import numpy as np

### Reading the CSV data file:

In [78]:
dataset = pd.read_csv("/content/drive/MyDrive/2022_02_08-02_30_31_AM.csv")

In [79]:
dataset.head(5)

Unnamed: 0,uuid,price_string,price_string_unf,product_type,level_1,category
0,638744a4-b0ae-4166-8cb6-5c063c862036,,,TGFwdG9wIENvdmVycy9CYWdz,TGFwdG9wIENhc2U,RWxlY3Ryb25pY3M
1,ab313969-02cc-48b2-9daf-0054efb70b92,,,QmFraW5nIEN1cHMgYW5kIE1hdHM,QmFraW5nIE1hdHMgLyBCYWtpbmcgZGlzaA,a2l0Y2hpbmcgYW5kIGRpbmluZw
2,acbd66ff-79f8-467a-91f9-108a45af5626,$19.95,,R3VtbWllcyB2aXRhbWlucyBhbmQgbWluZXJhbHMgZm9yIG...,SW1tdW5pdHkgZ3VtbWllcw,SGVhbHRo
3,963915d6-b2e3-4098-b242-9410593cf205,$92.00,,U2VydW1z,RmFjZSBTZXJ1bQ,YmVhdXR5IGFuZCBwZXJzb25hbCBjYXJl
4,b5b68f3c-b1e0-40e5-8ee5-e2f7236c1daf,11.50,,RWF0aW5nIFV0ZW5zaWxzL0N1dGxlcnk,Q2hvcHN0aWNrcw,a2l0Y2hpbmcgYW5kIGRpbmluZw


### Installing and Importing PySpark:

In [8]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 30 kB/s 
[?25hCollecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 59.4 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=674f099192bfcfe63fdc2465338f41e3a5b49a6d0cf39aabb69733f6de37ebd8
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [9]:
import pyspark

### Importing SparkContext:
- It is the main entry point for Spark functionality. A SparkContext represents the connection to a Spark cluster. 

### Importing SparkSession:
- SparkSession library of PySpark will allow us to create DataFrame, run SQL Queries over Tables

In [10]:
from pyspark import SparkContext
from pyspark.sql import SQLContext


sc = SparkContext('local', 'Spark SQL') 
sqlc = SQLContext(sc)



#### Creating a Session

In [14]:
from pyspark.sql import SparkSession
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.appName('Cool Data Engineer Stuffs').getOrCreate()

### Replacing all the NaN values to Zeros:

In [81]:
dataset.replace(np.nan , 0  , inplace = True)

In [82]:
dataset.head(5)

Unnamed: 0,uuid,price_string,price_string_unf,product_type,level_1,category
0,638744a4-b0ae-4166-8cb6-5c063c862036,0,0,TGFwdG9wIENvdmVycy9CYWdz,TGFwdG9wIENhc2U,RWxlY3Ryb25pY3M
1,ab313969-02cc-48b2-9daf-0054efb70b92,0,0,QmFraW5nIEN1cHMgYW5kIE1hdHM,QmFraW5nIE1hdHMgLyBCYWtpbmcgZGlzaA,a2l0Y2hpbmcgYW5kIGRpbmluZw
2,acbd66ff-79f8-467a-91f9-108a45af5626,$19.95,0,R3VtbWllcyB2aXRhbWlucyBhbmQgbWluZXJhbHMgZm9yIG...,SW1tdW5pdHkgZ3VtbWllcw,SGVhbHRo
3,963915d6-b2e3-4098-b242-9410593cf205,$92.00,0,U2VydW1z,RmFjZSBTZXJ1bQ,YmVhdXR5IGFuZCBwZXJzb25hbCBjYXJl
4,b5b68f3c-b1e0-40e5-8ee5-e2f7236c1daf,11.50,0,RWF0aW5nIFV0ZW5zaWxzL0N1dGxlcnk,Q2hvcHN0aWNrcw,a2l0Y2hpbmcgYW5kIGRpbmluZw


#### Replacing the $ charater from the price:

In [101]:
new_dataset = dataset['price_string'] = dataset['price_string'].str.replace("$", " ")

  """Entry point for launching an IPython kernel.


In [102]:
new_dataset.head(4)

0       NaN
1       NaN
2     19.95
3     92.00
Name: price_string, dtype: object

In [105]:
dataset.head(4)

Unnamed: 0,uuid,price_string,price_string_unf,product_type,level_1,category
0,638744a4-b0ae-4166-8cb6-5c063c862036,,0,TGFwdG9wIENvdmVycy9CYWdz,TGFwdG9wIENhc2U,RWxlY3Ryb25pY3M
1,ab313969-02cc-48b2-9daf-0054efb70b92,,0,QmFraW5nIEN1cHMgYW5kIE1hdHM,QmFraW5nIE1hdHMgLyBCYWtpbmcgZGlzaA,a2l0Y2hpbmcgYW5kIGRpbmluZw
2,acbd66ff-79f8-467a-91f9-108a45af5626,19.95,0,R3VtbWllcyB2aXRhbWlucyBhbmQgbWluZXJhbHMgZm9yIG...,SW1tdW5pdHkgZ3VtbWllcw,SGVhbHRo
3,963915d6-b2e3-4098-b242-9410593cf205,92.0,0,U2VydW1z,RmFjZSBTZXJ1bQ,YmVhdXR5IGFuZCBwZXJzb25hbCBjYXJl


#### Reading "CSV" to use with spark.sql to run SQL over

In [57]:
data_csv = '/content/drive/MyDrive/2022_02_08-02_30_31_AM.csv'
data_ = spark.read.format('csv').load(data_csv)

In [58]:
data_ = spark.read.csv(data_csv, header = True, inferSchema = True)
data_.show(5)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|638744a4-b0ae-416...|        null|            null|TGFwdG9wIENvdmVyc...|     TGFwdG9wIENhc2U|     RWxlY3Ryb25pY3M|
|ab313969-02cc-48b...|        null|            null|QmFraW5nIEN1cHMgY...|QmFraW5nIE1hdHMgL...|a2l0Y2hpbmcgYW5kI...|
|acbd66ff-79f8-467...|      $19.95|            null|R3VtbWllcyB2aXRhb...|SW1tdW5pdHkgZ3Vtb...|            SGVhbHRo|
|963915d6-b2e3-409...|      $92.00|            null|            U2VydW1z|      RmFjZSBTZXJ1bQ|YmVhdXR5IGFuZCBwZ...|
|b5b68f3c-b1e0-40e...|       11.50|            null|RWF0aW5nIFV0ZW5za...|      Q2hvcHN0aWNrcw|a2l0Y2hpbmcgYW5kI...|
+--------------------+------------+----------------+--------------------

In [62]:
data_.createOrReplaceTempView('data_')

In [63]:
# this is pyspark.sql dataframe
type(data_)

pyspark.sql.dataframe.DataFrame

#### Getting the price_string count with non-zero values: 

In [64]:
(spark.sql('select count(price_string) from data_ where price_string != 0')).show(1)

+-------------------+
|count(price_string)|
+-------------------+
|             210000|
+-------------------+



In [65]:
(spark.sql('select count(price_string) from data_')).show(1)

+-------------------+
|count(price_string)|
+-------------------+
|            5570000|
+-------------------+



#### Count of products without prices and with prices in each Product Type, Category, Level 1


In [66]:
(spark.sql('select price_string as price , product_type , level_1 , category from data_ where price_string != 0 group by price_string, product_type , category , level_1')).show(9)

+-----+--------------------+--------------------+--------------------+
|price|        product_type|             level_1|            category|
+-----+--------------------+--------------------+--------------------+
| 6.60|Q29va2luZyBVdGVuc...|             VG9uZ3M|a2l0Y2hpbmcgYW5kI...|
|14.95|U3R1ZmZlZCBBbmltY...|U3R1ZmZlZCBBcXVhd...| VG95cyBhbmQgR2FtZXM|
|67.00|UHJvZHVjZSBCYWdzI...|    UHJvZHVjZSBiYWdz|a2l0Y2hpbmcgYW5kI...|
|14.95|U3R1ZmZlZCBBbmltY...|  U3R1ZmZlZCBCZWFycw| VG95cyBhbmQgR2FtZXM|
|17.00|    Rm9vZCBTdG9yYWdl| Rm9vZCBDb250YWluZXI|a2l0Y2hpbmcgYW5kI...|
|13.00|Q29va2luZyBVdGVuc...| Q3V0dGluZyBCb2FyZHM|a2l0Y2hpbmcgYW5kI...|
|22.00|Q29va2luZyBVdGVuc...|TWl4aW5nIHNwb29uc...|a2l0Y2hpbmcgYW5kI...|
|14.95|U3R1ZmZlZCBBbmltY...|U3R1ZmZlZCBlYWdsZ...| VG95cyBhbmQgR2FtZXM|
| 5.75|Q29va2luZyBVdGVuc...|         U3BhdHVsYXM|a2l0Y2hpbmcgYW5kI...|
+-----+--------------------+--------------------+--------------------+
only showing top 9 rows



In [67]:
data_.show(10)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|638744a4-b0ae-416...|        null|            null|TGFwdG9wIENvdmVyc...|     TGFwdG9wIENhc2U|     RWxlY3Ryb25pY3M|
|ab313969-02cc-48b...|        null|            null|QmFraW5nIEN1cHMgY...|QmFraW5nIE1hdHMgL...|a2l0Y2hpbmcgYW5kI...|
|acbd66ff-79f8-467...|      $19.95|            null|R3VtbWllcyB2aXRhb...|SW1tdW5pdHkgZ3Vtb...|            SGVhbHRo|
|963915d6-b2e3-409...|      $92.00|            null|            U2VydW1z|      RmFjZSBTZXJ1bQ|YmVhdXR5IGFuZCBwZ...|
|b5b68f3c-b1e0-40e...|       11.50|            null|RWF0aW5nIFV0ZW5za...|      Q2hvcHN0aWNrcw|a2l0Y2hpbmcgYW5kI...|
|389d9f75-cc3f-4bd...|        null|            null|TmF0dXJhbCBTd2Vld...

In [68]:
type(data_)

pyspark.sql.dataframe.DataFrame