# Working on Big-Data using Pyspark:
 *Created by Anmol Kumar [May 11 2022]*

### Libraries required:
* Pandas, PySpark. To view instructions on installing them, visit [Pandas](https://pandas.pydata.org/) and [PySpark](https://spark.apache.org/docs/latest/api/python/index.html).
* In addition to this, Java is also required. Visit [Java](https://www.java.com/en/) for instructions.

### Process followed:
* Reading the csv file using SparkSession object. Since the file is ~1.38GB, Pandas cannot be used.

##### Task 1:
* To get the products which don't have a price, I have used ``pyspark.sql.Column.isNull`` to filter the rows with ``price_string`` as ``null``.

##### Task 2:
* To count of products without prices and with prices in each Product Type, Category, Level 1, I first removed the duplicates in each type of product.
* After getting a DataFrame of each unique type, I extracted all the unique values in a list.
* After that, I iterated over each item in the list to separately obtain two new DataFrames having prices and not having prices
* ``pyspark.sql.DataFrame.count`` is then used to get the required number.

##### Task 3:
* Using ``pyspark.sql.DataFrame.filter``, ``pyspark.sql.Column.isNotNull``, ``pyspark.sql.Column.startswith`` and ``pyspark.sql.Column.substr``, we can process in the following sequential way.
* I first look for the entries which have workable ``price_string``, i.e., non-``null`` values.
* After that I only look for the entries which begin with dollar sign to extract their values.
* I then extract the values and arrange them separately in ``values`` Column.

##### Task 4:
* Iterating over every category, we can use ``pyspark.sql.DataFrame.agg`` to get average prices in every category.

### How to run the notebook?
* Make sure the required libraries and Python are installed.
* Next, make sure the ``data_values.csv`` is present in the same directory as the notebook.
* Open a terminal in the directory and run the notebook. For notebook, visit [Jupyter Notebook](https://jupyter.org/) or any other notebook installation websites for further instructions.
* In case of Jupyter, open the notebook and click on 'Cell', then 'Run All'.

In [1]:
import pyspark

from pyspark.sql import SparkSession

spark=SparkSession.builder.appName("pysparkdf").getOrCreate()

In [2]:
df = spark.read.option("header", "true").csv("data-values.csv")

#Checking the top 10 rows
df.show(10)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|638744a4-b0ae-416...|        null|            null|TGFwdG9wIENvdmVyc...|     TGFwdG9wIENhc2U|     RWxlY3Ryb25pY3M|
|ab313969-02cc-48b...|        null|            null|QmFraW5nIEN1cHMgY...|QmFraW5nIE1hdHMgL...|a2l0Y2hpbmcgYW5kI...|
|acbd66ff-79f8-467...|      $19.95|            null|R3VtbWllcyB2aXRhb...|SW1tdW5pdHkgZ3Vtb...|            SGVhbHRo|
|963915d6-b2e3-409...|      $92.00|            null|            U2VydW1z|      RmFjZSBTZXJ1bQ|YmVhdXR5IGFuZCBwZ...|
|b5b68f3c-b1e0-40e...|       11.50|            null|RWF0aW5nIFV0ZW5za...|      Q2hvcHN0aWNrcw|a2l0Y2hpbmcgYW5kI...|
|389d9f75-cc3f-4bd...|        null|            null|TmF0dXJhbCBTd2Vld...

##### Task 1:
* To get the products which don't have a price, I will use ``pyspark.sql.Column.isNull`` to filter the rows with ``price_string`` as ``null``.

In [3]:
#To get products without prices
df_task1 = df.where(df['price_string'].isNull())

#Checking the top 10 rows to see if it works
df_task1.show(10)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|638744a4-b0ae-416...|        null|            null|TGFwdG9wIENvdmVyc...|     TGFwdG9wIENhc2U|     RWxlY3Ryb25pY3M|
|ab313969-02cc-48b...|        null|            null|QmFraW5nIEN1cHMgY...|QmFraW5nIE1hdHMgL...|a2l0Y2hpbmcgYW5kI...|
|389d9f75-cc3f-4bd...|        null|            null|TmF0dXJhbCBTd2Vld...|      TW9uayBGcnVpdA|        Z3JvY2VyaWVz|
|1f2766ec-0a27-43f...|        null|            null|U2NydWJzIGFuZCBjb...|            U2NydWJz|SG91c2Vob2xkIGFuZ...|
|7bad39f5-74b9-461...|        null|            null|UGx1cyBzaXplIHdlY...|             RHJlc3M|Q2xvdGhpbmcgYW5kI...|
|524886b5-7cc2-4a5...|        null|            null| TWF0ZXJuaXR5IFBhZHM

##### Task 2:
* To count of products without prices and with prices in each Product Type, Category, Level 1, I first removed the duplicates in each type of product.

In [4]:
df_c = df.drop_duplicates(subset = ['category'])
df_p = df.drop_duplicates(subset = ['product_type'])
df_l = df.drop_duplicates(subset = ['level_1'])

In [5]:
#Checking the first 10 rows of unique category DataFrame. Product_type and level_1 will execute in the same way
df_c.show(10)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+
|13724827-979f-401...|        null| Current price: |                null|                null|                null|
|8e598c24-db06-48d...|         $75|            null|V2ludGVyIEFjY2Vzc...|            QmVhbmll|Q2xvdGhpbmcgYW5kI...|
|778768c3-7c21-44a...|       $6.50|            null|TmF0dXJhbCBQaWdtZ...|TmF0dXJhbCBQaWdtZ...|    QXJ0IHN1cHBsaWVz|
|07941b4b-2995-488...|        null|            null|        QmFieSBUb3lz|     RmFicmljIHRveXM|         QmFieWNhcmU|
|a571890c-4c5d-4c9...|      $24.95|            null|U21hcnQgV2F0Y2ggU...|U21hcnQgV2F0Y2ggU...|     RWxlY3Ryb25pY3M|
|9599f1a9-d406-43e...|      $24.99|            null|TW9wcyBhbmQgYnJvb21z

* Extracted all the unique values as a list.
* ``pyspark.sql.DataFrame.count`` is then used to get the required number.

In [6]:
cats = [row['category'] for row in df_c.collect()]
prod_types = [row['product_type'] for row in df_p.collect()]
levels = [row['level_1'] for row in df_l.collect()]

In [7]:
# Finding the number of products with prices and without prices in the first 5 types of value in categories
for cat in cats[0:10]:
    df_or = df.filter(df['category'] == cat)
    count_full = df_or.count()
    count_none = df_or.na.drop(subset = 'price_string').count()
    print('cat: ', cat,', with prices: ', (count_full-count_none),', without prices:', count_none)

cat:  None , with prices:  0 , without prices: 0
cat:  Q2xvdGhpbmcgYW5kIEFjY2Vzc29yaWVz , with prices:  2030000 , without prices: 1840000
cat:  QXJ0IHN1cHBsaWVz , with prices:  130000 , without prices: 190000
cat:  QmFieWNhcmU , with prices:  550000 , without prices: 90000
cat:  RWxlY3Ryb25pY3M , with prices:  210000 , without prices: 110000
cat:  SG91c2Vob2xkIGFuZCBDbGVhbmluZw , with prices:  870000 , without prices: 700000
cat:  SGVhbHRo , with prices:  700000 , without prices: 290000
cat:  VG95cyBhbmQgR2FtZXM , with prices:  600000 , without prices: 190000
cat:  VG9vbHMgYW5kIGhvbWUgaW1wcm92ZW1lbnQ , with prices:  320000 , without prices: 200000
cat:  YmVhdXR5IGFuZCBwZXJzb25hbCBjYXJl , with prices:  720000 , without prices: 750000


In [8]:
# Finding the number of products with prices and without prices in the first 5 types of value in product_type
for prod in prod_types[0:10]:
    df_or = df.filter(df['product_type'] == prod)
    count_full = df_or.count()
    count_none = df_or.na.drop(subset = 'price_string').count()
    print('prod_type: ', prod,', with prices: ', (count_full-count_none),', without prices:', count_none)

prod_type:  None , with prices:  0 , without prices: 0
prod_type:  Q0FUIExJVFRFUg , with prices:  30000 , without prices: 0
prod_type:  Q29mZmVlIEJlYW5z , with prices:  50000 , without prices: 30000
prod_type:  Q29mZmVlIEZpbHRlcnM , with prices:  10000 , without prices: 0
prod_type:  Q29uZGl0aW9uZXIgSGFpcg , with prices:  20000 , without prices: 10000
prod_type:  Q29va2llcw , with prices:  30000 , without prices: 0
prod_type:  Q29va2luZyBPaWw , with prices:  10000 , without prices: 70000
prod_type:  Q29va2luZyBVdGVuc2lscw , with prices:  20000 , without prices: 110000
prod_type:  Q2F0IHRveXMgcGV0 , with prices:  20000 , without prices: 0
prod_type:  Q2FuZGxlcy9GcmFnbmFuY2Vz , with prices:  20000 , without prices: 40000


In [9]:
# Finding the number of products with prices and without prices in the first 5 types of value in level_1
for level in levels[0:10]:
    df_or = df.filter(df['level_1'] == level)
    count_full = df_or.count()
    count_none = df_or.na.drop(subset = 'price_string').count()
    print('level: ', level,', with prices: ', (count_full-count_none),', without prices:', count_none)

level:  None , with prices:  0 , without prices: 0
level:  MTAgQ3Vw , with prices:  0 , without prices: 10000
level:  MjAgQ3Vw , with prices:  0 , without prices: 10000
level:  MjAgUGludHM , with prices:  20000 , without prices: 10000
level:  MzAgQ3Vw , with prices:  0 , without prices: 10000
level:  MzAtIDM1IFBpbnRz , with prices:  10000 , without prices: 10000
level:  NSBHYWxsb24 , with prices:  0 , without prices: 10000
level:  NTAgUGludHM , with prices:  20000 , without prices: 10000
level:  NiBDdXA , with prices:  0 , without prices: 10000
level:  OCBDdXA , with prices:  0 , without prices: 10000


##### Task 3:
* the first ``filter`` is used to filter the products with ``null`` as ``price_string``.
* the second ``filter`` finds the products whose prices begin with dollar sign, as only they can be extracted into the new required columns.
* ``select`` is used to generate a new DataFrame with the same columns as the original DataFrame but with new ``currency`` and ``value``.

In [10]:
#Importing the required function to fill the currency column
from pyspark.sql.functions import lit

df_task3 = df.filter(df['price_string'].isNotNull()).filter(df['price_string'].startswith('$')).select(df[0], df[1], df[2], df[3], df[4], df[5], lit('$').alias('currency'), df['price_string'].substr(2,5).alias('value'))

#Checking the top 10 rows to see if it works
df_task3.show(10)

+--------------------+------------+----------------+--------------------+--------------------+--------------------+--------+-----+
|                uuid|price_string|price_string_unf|        product_type|             level_1|            category|currency|value|
+--------------------+------------+----------------+--------------------+--------------------+--------------------+--------+-----+
|acbd66ff-79f8-467...|      $19.95|            null|R3VtbWllcyB2aXRhb...|SW1tdW5pdHkgZ3Vtb...|            SGVhbHRo|       $|19.95|
|963915d6-b2e3-409...|      $92.00|            null|            U2VydW1z|      RmFjZSBTZXJ1bQ|YmVhdXR5IGFuZCBwZ...|       $|92.00|
|9599f1a9-d406-43e...|      $24.99|            null|TW9wcyBhbmQgYnJvb21z|                TW9w|SG91c2Vob2xkIGFuZ...|       $|24.99|
|35799087-f6f4-4ca...|     $148.00|            null|    V29tZW5zIFBhbnRz|V29tZW5zIFJlZ3VsY...|Q2xvdGhpbmcgYW5kI...|       $|148.0|
|9b3f553e-ee4c-4e1...|         $89|            null|    V29tZW5zIFBhbnRz|  V29tZW5z

##### Task 4:
* Iterate over each category and use ``pyspark.sql.DataFrame.agg`` to get average prices.

In [11]:
#Going through the first 5 categories with 0 excluded as its a None values
for cat in cats[1:11]:
    df_temp = df_task3.filter(df_task3['category'] == cat)
    avg = df_temp.agg({'value': 'avg'})
    avg_val = avg.head()[0]
    print('cat: ', cat, ', average price: ', avg_val)

cat:  Q2xvdGhpbmcgYW5kIEFjY2Vzc29yaWVz , average price:  125.24032786885073
cat:  QXJ0IHN1cHBsaWVz , average price:  19.467368421053862
cat:  QmFieWNhcmU , average price:  163.67888888888473
cat:  RWxlY3Ryb25pY3M , average price:  60.25000000000101
cat:  SG91c2Vob2xkIGFuZCBDbGVhbmluZw , average price:  75.73811594210714
cat:  SGVhbHRo , average price:  32.55758620688889
cat:  VG95cyBhbmQgR2FtZXM , average price:  37.115333333335165
cat:  VG9vbHMgYW5kIGhvbWUgaW1wcm92ZW1lbnQ , average price:  82.20799999999528
cat:  YmVhdXR5IGFuZCBwZXJzb25hbCBjYXJl , average price:  34.745066666667455
cat:  Z3JvY2VyaWVz , average price:  10.541666666662943


### Remarks:
* I have used the first 10 values in most of the examples but they can be iterated throughout the entire DataFrame without any errors. Although it will consume a lot more time to execute.