##> Full Process To Analyse Big Data Using Apache Spark and Python

### 1 _ Import Required Librarys

In [1]:
# First of all we should make sure of the version of JAVA that we have
!java --version

openjdk 11.0.17 2022-10-18
OpenJDK Runtime Environment (build 11.0.17+8-post-Ubuntu-1ubuntu218.04)
OpenJDK 64-Bit Server VM (build 11.0.17+8-post-Ubuntu-1ubuntu218.04, mixed mode, sharing)


In [2]:
# Configiration of Alternative java version , no need in case of having one single java version
!sudo update-alternatives --config java

There is only one alternative in link group java (providing /usr/bin/java): /usr/lib/jvm/java-11-openjdk-amd64/bin/java
Nothing to configure.


In [3]:
# Even that google colab have all needed requireds , there is still more to add
!pip install -q findspark
!pip install pyspark
!pip install google-cloud-bigquery

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 47 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 66.1 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=c28c6d1ecb98b74f810dc849a01338a53a7e6091cca39e12f26710b13ae4a45e
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [4]:
# for more better performance , it will be better to use nvedia with the GPU
!nvidia-smi

Tue Dec 20 09:00:22 2022       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 460.32.03    Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla T4            Off  | 00000000:00:04.0 Off |                    0 |
| N/A   66C    P0    31W /  70W |      0MiB / 15109MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

### 2 _ Inistialise Spark Envirement

In [5]:
# We can start with inisialse spark finder
import findspark
findspark.init()

In [6]:
# Next we can use the SparkSession
from pyspark.sql import SparkSession

In [7]:
# Next we need to import all pyspark sql manipulation functions
from pyspark.sql.functions import *

In [8]:
# We need now to initialse our session instance
spark = SparkSession.builder \
    .appName("LSTM App") \
    .getOrCreate()

### 3 _ Creating And Reading DataFrame

In [9]:
# The database we gonna use is a good example of Big Data , that containe more than 22 milion line
sql = """SELECT * FROM `fair-sandbox-321621.iowa_liquor_retail_sales.sales` LIMIT 100000""" 

In [10]:
# the database is saved in Big Query so we are going to read it from there
from pandas.io import gbq

In [11]:
# now its time to read the data base
# since we are going to use spark data frame , we are going to use the createDataFrame function
rdd = spark.createDataFrame(
          gbq.read_gbq(
              sql,
              project_id='fair-sandbox-321621'
          )
      )

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=725825577420-unm2gnkiprugilg743tkbig250f4sfsj.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fbigquery&state=IlzJfGTCrxF7xkT2i6Ci8786siskxf&prompt=consent&access_type=offline
Enter the authorization code: 4/1AWgavdcrsL-48wt59Wfh7ReBYmsrArws5xoFyb1W4hb7J0WOoo9HaVTUekc


In [12]:
# all data type in spark is stored in a form of RDD (Resilient Distributed Dataset)
rdd.show(5)

+-----------------------+----------+------------+--------------------+----------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+
|invoice_and_item_number|      date|store_number|          store_name|         address|           city|zip_code|      store_location|county_number|    county| category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|
+-----------------------+----------+------------+--------------------+----------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+--------------------+----------

### 4 _ Manupilation Of ApacheSpark DataFrame

In [13]:
# first of all we gonna show all dataframe columns
rdd.columns

['invoice_and_item_number',
 'date',
 'store_number',
 'store_name',
 'address',
 'city',
 'zip_code',
 'store_location',
 'county_number',
 'county',
 'category',
 'category_name',
 'vendor_number',
 'vendor_name',
 'item_number',
 'item_description',
 'pack',
 'bottle_volume_ml',
 'state_bottle_cost',
 'state_bottle_retail',
 'bottles_sold',
 'sale_dollars',
 'volume_sold_liters',
 'volume_sold_gallons']

In [14]:
# We can next show the shape of our dataset
print((rdd.count(),len(rdd.columns)))

(100000, 24)


In [15]:
# Spark also use another function to describe database shema
rdd.printSchema() 

root
 |-- invoice_and_item_number: string (nullable = true)
 |-- date: date (nullable = true)
 |-- store_number: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- store_location: string (nullable = true)
 |-- county_number: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- vendor_number: string (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item_description: string (nullable = true)
 |-- pack: long (nullable = true)
 |-- bottle_volume_ml: long (nullable = true)
 |-- state_bottle_cost: double (nullable = true)
 |-- state_bottle_retail: double (nullable = true)
 |-- bottles_sold: long (nullable = true)
 |-- sale_dollars: double (nullable = true)
 |-- volume_sold_liters: double (nullable = tr

In [16]:
# we can also show a full description of our databse
rdd.describe().show()

+-------+-----------------------+-----------------+--------------------+--------------------+-------+-----------------+--------------------+------------------+------+------------------+---------------+------------------+--------------------+-----------------+--------------------+------------------+-----------------+------------------+-------------------+------------------+------------------+------------------+-------------------+
|summary|invoice_and_item_number|     store_number|          store_name|             address|   city|         zip_code|      store_location|     county_number|county|          category|  category_name|     vendor_number|         vendor_name|      item_number|    item_description|              pack| bottle_volume_ml| state_bottle_cost|state_bottle_retail|      bottles_sold|      sale_dollars|volume_sold_liters|volume_sold_gallons|
+-------+-----------------------+-----------------+--------------------+--------------------+-------+-----------------+-------------

In [17]:
# We can use the following function to drop any duplication 
rdd.dropDuplicates()

DataFrame[invoice_and_item_number: string, date: date, store_number: string, store_name: string, address: string, city: string, zip_code: string, store_location: string, county_number: string, county: string, category: string, category_name: string, vendor_number: string, vendor_name: string, item_number: string, item_description: string, pack: bigint, bottle_volume_ml: bigint, state_bottle_cost: double, state_bottle_retail: double, bottles_sold: bigint, sale_dollars: double, volume_sold_liters: double, volume_sold_gallons: double]

In [18]:
# We gonna use also the filter function no find any null values
{col:rdd.filter(rdd[col].isNull()).count() for col in rdd.columns}

{'invoice_and_item_number': 0,
 'date': 0,
 'store_number': 0,
 'store_name': 0,
 'address': 318,
 'city': 318,
 'zip_code': 318,
 'store_location': 9870,
 'county_number': 1912,
 'county': 728,
 'category': 92,
 'category_name': 159,
 'vendor_number': 0,
 'vendor_name': 0,
 'item_number': 0,
 'item_description': 0,
 'pack': 0,
 'bottle_volume_ml': 0,
 'state_bottle_cost': 0,
 'state_bottle_retail': 0,
 'bottles_sold': 0,
 'sale_dollars': 0,
 'volume_sold_liters': 0,
 'volume_sold_gallons': 0}

### 5 _ Creating the dataWarehouse

In [19]:
# lets make sure that the sales are not negative
rdd.where(rdd.sale_dollars < 0).count()

0

In [20]:
# First of all we gonna start with filling all the Null Values with 0
rdd.fillna(
          0, subset=[
                      'bottle_volume_ml', 
                      'state_bottle_cost' ,
                      'state_bottle_retail' ,
                      'bottles_sold',
                      'sale_dollars',
                      'volume_sold_liters'
                      ]
           )

DataFrame[invoice_and_item_number: string, date: date, store_number: string, store_name: string, address: string, city: string, zip_code: string, store_location: string, county_number: string, county: string, category: string, category_name: string, vendor_number: string, vendor_name: string, item_number: string, item_description: string, pack: bigint, bottle_volume_ml: bigint, state_bottle_cost: double, state_bottle_retail: double, bottles_sold: bigint, sale_dollars: double, volume_sold_liters: double, volume_sold_gallons: double]

In [21]:
# We Are not in a need of All the columns so we gonna just use the following columns
rdd.where(date_format(rdd.date, 'dd') == '01').groupBy('date')\
        .agg(
            sum("sale_dollars") , sum('state_bottle_cost') , sum('bottles_sold') , sum('volume_sold_liters')
          )\
        .sort('date')\
        .show(6)

+----------+-----------------+----------------------+-----------------+-----------------------+
|      date|sum(sale_dollars)|sum(state_bottle_cost)|sum(bottles_sold)|sum(volume_sold_liters)|
+----------+-----------------+----------------------+-----------------+-----------------------+
|2012-02-01|          7159.83|     526.7800000000001|              556|                 465.76|
|2012-03-01|          8627.76|     625.6500000000001|              666|                  514.5|
|2012-05-01|6614.799999999999|    469.59999999999997|              610|                 530.26|
|2012-08-01|7141.290000000001|    441.72999999999996|              550|                 447.88|
|2012-10-01|7835.700000000001|                522.43|              678|                  522.0|
|2012-11-01|6936.000000000001|    471.52000000000004|              633|                  478.5|
+----------+-----------------+----------------------+-----------------+-----------------------+
only showing top 6 rows



In [22]:
# the date format is not what we should use since we need to predict next 30 days 
# we should make two column that make month and which are grouping by years
rdd = rdd.withColumn("specefic_date", date_format(rdd.date, 'MM-dd')) 
rdd = rdd.withColumn("year", date_format(rdd.date, 'yyyy'))
rdd.show(6)

+-----------------------+----------+------------+--------------------+----------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+-------------+----+
|invoice_and_item_number|      date|store_number|          store_name|         address|           city|zip_code|      store_location|county_number|    county| category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|specefic_date|year|
+-----------------------+----------+------------+--------------------+----------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------

In [23]:
# now let's make the neccesserie querys
data = rdd.where(date_format(rdd.date, 'dd') == '01')\
        .groupBy(
            'date',
            'year',
            'specefic_date'
        ).agg(
            sum("sale_dollars").alias("total sale_dollars") , sum('state_bottle_cost').alias('total state_bottle_cost') ,
            sum('state_bottle_retail').alias('total state_bottle_retail') , sum('bottles_sold').alias('total bottles_sold') , 
            sum('volume_sold_liters').alias('total volume_sold_liters')
        ).sort('date')

In [24]:
# And this the last result of our manipulation
data.show()

+----------+----+-------------+------------------+-----------------------+-------------------------+------------------+------------------------+
|      date|year|specefic_date|total sale_dollars|total state_bottle_cost|total state_bottle_retail|total bottles_sold|total volume_sold_liters|
+----------+----+-------------+------------------+-----------------------+-------------------------+------------------+------------------------+
|2012-02-01|2012|        02-01|           7159.83|      526.7800000000001|                   795.23|               556|                  465.76|
|2012-03-01|2012|        03-01|           8627.76|      625.6500000000001|        941.9399999999998|               666|                   514.5|
|2012-05-01|2012|        05-01| 6614.799999999999|     469.59999999999997|        705.8000000000001|               610|                  530.26|
|2012-08-01|2012|        08-01| 7141.290000000001|     441.72999999999996|        665.1800000000001|               550|           

### 6 _ Saving data using Cross Validation

In [25]:
# Cross Validation is a method that stending on creating K Folder of Data
# We End Up with using K-1 Folder For Training And 1 For Testing
data.groupBy('year').count().sort('year').show()

+----+-----+
|year|count|
+----+-----+
|2012|    6|
|2013|    5|
|2014|    5|
|2015|    6|
|2016|    8|
|2017|    8|
|2018|    7|
|2019|    8|
|2020|    7|
|2021|    9|
|2022|    8|
+----+-----+



In [26]:
# for each year we gonna show the sells of that year
for year in data.groupBy('year').count().sort('year').collect():
  data.where(
      data.year == year.year
      ).show(4)

+----------+----+-------------+------------------+-----------------------+-------------------------+------------------+------------------------+
|      date|year|specefic_date|total sale_dollars|total state_bottle_cost|total state_bottle_retail|total bottles_sold|total volume_sold_liters|
+----------+----+-------------+------------------+-----------------------+-------------------------+------------------+------------------------+
|2012-02-01|2012|        02-01|           7159.83|      526.7800000000001|                   795.23|               556|                  465.76|
|2012-03-01|2012|        03-01|           8627.76|      625.6500000000001|        941.9399999999998|               666|                   514.5|
|2012-05-01|2012|        05-01| 6614.799999999999|     469.59999999999997|        705.8000000000001|               610|                  530.26|
|2012-08-01|2012|        08-01| 7141.290000000001|     441.72999999999996|        665.1800000000001|               550|           

In [27]:
# Now We gonna save our data as csv file in the same folder
for year in data.groupBy('year').count().sort('year').collect():
  files = data.where(data.year == year.year)
  files.write.option('header', True)\
              .csv(
                f"/content/Lstm_Validation_data/{year.year}_sells_dataset.csv"
              )

In [28]:
# Probably we gonna need the whole data so let's save it too
data.write.option('header', True)\
              .csv(
                "/content/Lstm_All_data/whole_sells_dataset.csv"
              )

In [29]:
# And we end up our session with stop function
spark.stop()