### What is Apache Spark

Apache Spark is currently one of the most popular framework for large-scale data processing. It is an Open source analytical processing engine for processing data in a distributed manner.
While it's largely used for data processing, it is also very useful for building Machine learning application because of it special ML libraries. Spark is maintained by the Apache Foundation hence the name Apache Spark.

### Why Spark?
While pandas works really great on small to mid-size data (apprx. 5-10GB), it quickly becomes inefficient when data 
becomes very large.
For us to process very large dataset, we may need a high computational capability. This capability cannot
be provided by a single Machine but a cluster of computing machines called nodes. This therefore means we
need an application that can run on mutiple machines (nodes) to process the data. This is What Spark
is designed for.

Spark splits the large data into manageble pieces and distribute across mutiple nodes in a cluster, process the data in the individual clusters and finally combines the result.

Spark support a few programming languages including Python, Scala, Java etc. In this Class we will be
using the Python language with Spark also know as PySpark

### Spark Structured APIs
1. DataFrames
2. Datasets
3. Spark SQL

### Spark Features
1. In-memory computation
2. Distributed processing using parallelize
3. Can be used with many cluster managers (Spark, Yarn, Mesos etc)
4. Fault-tolerant
6. Lazy evaluation
7. Cache & persistence
8. Inbuild-optimization when using DataFrames
9. Supports ANSI SQL

### Running Spark Application
We run a Spark Application through a driver programe called the SparkSession. This is the interface that submits a Spark program to a cluster.

Let's import the SparkSession:

In [1]:
pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.3.1[0m[39;49m -> [0m[32;49m25.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col, sum, avg, count, when, lit
spark = SparkSession.builder.appName("sparkIntro").getOrCreate()

25/02/10 13:02:01 WARN Utils: Your hostname, apples-MacBook-Air-5.local resolves to a loopback address: 127.0.0.1; using 192.168.1.129 instead (on interface en0)
25/02/10 13:02:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/10 13:02:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


25/02/10 13:02:17 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


### Spark DataFrame
The Spark Dataframe API allows us to work with data in a structured format. Spark Dataframe behaves like the Pandas Dataframe with a few difference. Let's get some data into a DataFrame:

#### Read from a CSV File into a Spark Dataframe

In [4]:
# Read in the data
stock_df = spark.read.csv('Data/stock_data.csv', header= True, inferSchema= True)

                                                                                

In [5]:
stock_df.show(10)

+----------+--------------------+----------+--------+------+
|    Ticker|                Name|    Volume|   Price|Change|
+----------+--------------------+----------+--------+------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580|    1.52| -0.16|
|  ABCTRANS|Associated Bus Co...|   370,378|    0.34| -0.02|
|   ACADEMY|       Academy Press|     4,760|    1.99|   0.0|
|ACCESSCORP| Access Holdings Plc|41,023,827|   14.25|   0.0|
| AFRINSURE|African Alliance ...|      NULL|    0.20|  NULL|
|  AFRIPRUD|Africa Prudential...| 1,332,872|    6.40|  0.15|
| AFROMEDIA|       Afromedia Plc|      NULL|    0.20|  NULL|
|     AIICO| Aiico Insurance Plc|19,952,840|    0.74|  0.04|
|AIRTELAFRI|   Airtel Africa Plc|    40,441|1,289.00|   0.0|
|      ALEX|Aluminium Extrusi...|      NULL|    6.50|  NULL|
+----------+--------------------+----------+--------+------+
only showing top 10 rows



In [7]:
# We can also use Limit to limit the number of records returned by a dataframe
stock_df.limit(5).show(10)

+----------+--------------------+----------+-----+------+
|    Ticker|                Name|    Volume|Price|Change|
+----------+--------------------+----------+-----+------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580| 1.52| -0.16|
|  ABCTRANS|Associated Bus Co...|   370,378| 0.34| -0.02|
|   ACADEMY|       Academy Press|     4,760| 1.99|   0.0|
|ACCESSCORP| Access Holdings Plc|41,023,827|14.25|   0.0|
| AFRINSURE|African Alliance ...|      NULL| 0.20|  NULL|
+----------+--------------------+----------+-----+------+



In [8]:
# Check data schema
stock_df.schema

StructType([StructField('Ticker', StringType(), True), StructField('Name', StringType(), True), StructField('Volume', StringType(), True), StructField('Price', StringType(), True), StructField('Change', DoubleType(), True)])

In [9]:
# Using PrintSchema
stock_df.printSchema()

root
 |-- Ticker: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Price: string (nullable = true)
 |-- Change: double (nullable = true)



In [10]:
# Describe data
stock_df.describe().show()

25/02/10 13:19:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 5:>                                                          (0 + 1) / 1]

+-------+----------+--------------------+------------------+-----------------+--------------------+
|summary|    Ticker|                Name|            Volume|            Price|              Change|
+-------+----------+--------------------+------------------+-----------------+--------------------+
|  count|       156|                 156|               121|              156|                 121|
|   mean|  Infinity|                NULL|             250.8|24.71836601307189|-0.03826446280991...|
| stddev|      NULL|                NULL|235.66756246883023|70.02011350561236|  0.3294128050687324|
|    min|  ABBEYBDS|AXA Mansard Insur...|         1,002,000|             0.20|                -2.0|
|    max|ZENITHBANK|     Zenith Bank Plc|           836,301|            86.00|                 1.4|
+-------+----------+--------------------+------------------+-----------------+--------------------+



                                                                                

In [14]:
# Select columns from the data using the select function
stock_with_few_cols = stock_df.select(['Ticker', 'name', 'Price'])
stock_with_few_cols.show(5)

+----------+--------------------+-----+
|    Ticker|                name|Price|
+----------+--------------------+-----+
|  ABBEYBDS|Abbey Mortgage Ba...| 1.52|
|  ABCTRANS|Associated Bus Co...| 0.34|
|   ACADEMY|       Academy Press| 1.99|
|ACCESSCORP| Access Holdings Plc|14.25|
| AFRINSURE|African Alliance ...| 0.20|
+----------+--------------------+-----+
only showing top 5 rows



In [15]:
# Adding new columns to Dataframe
transformed_df = stock_df.withColumn('TaxDeduction', stock_df['Price'] - (stock_df['Price']*0.05))
transformed_df.show(5)

+----------+--------------------+----------+-----+------+------------+
|    Ticker|                Name|    Volume|Price|Change|TaxDeduction|
+----------+--------------------+----------+-----+------+------------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580| 1.52| -0.16|       1.444|
|  ABCTRANS|Associated Bus Co...|   370,378| 0.34| -0.02|       0.323|
|   ACADEMY|       Academy Press|     4,760| 1.99|   0.0|      1.8905|
|ACCESSCORP| Access Holdings Plc|41,023,827|14.25|   0.0|     13.5375|
| AFRINSURE|African Alliance ...|      NULL| 0.20|  NULL|        0.19|
+----------+--------------------+----------+-----+------+------------+
only showing top 5 rows



In [17]:
# Create column from a literal value
transformed_df.withColumn('Identifier', lit('stock')).show(5)

+----------+--------------------+----------+-----+------+------------+----------+
|    Ticker|                Name|    Volume|Price|Change|TaxDeduction|Identifier|
+----------+--------------------+----------+-----+------+------------+----------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580| 1.52| -0.16|       1.444|     stock|
|  ABCTRANS|Associated Bus Co...|   370,378| 0.34| -0.02|       0.323|     stock|
|   ACADEMY|       Academy Press|     4,760| 1.99|   0.0|      1.8905|     stock|
|ACCESSCORP| Access Holdings Plc|41,023,827|14.25|   0.0|     13.5375|     stock|
| AFRINSURE|African Alliance ...|      NULL| 0.20|  NULL|        0.19|     stock|
+----------+--------------------+----------+-----+------+------------+----------+
only showing top 5 rows



In [18]:
# Creating conditional column using the when expression
ranked_stock = stock_df \
    .select("Ticker", "Name",\
    when(col("Price") <= 500, "Poor stock") \
    .otherwise('High performance stock') \
    .alias("StockRanking")).show()

+----------+--------------------+--------------------+
|    Ticker|                Name|        StockRanking|
+----------+--------------------+--------------------+
|  ABBEYBDS|Abbey Mortgage Ba...|          Poor stock|
|  ABCTRANS|Associated Bus Co...|          Poor stock|
|   ACADEMY|       Academy Press|          Poor stock|
|ACCESSCORP| Access Holdings Plc|          Poor stock|
| AFRINSURE|African Alliance ...|          Poor stock|
|  AFRIPRUD|Africa Prudential...|          Poor stock|
| AFROMEDIA|       Afromedia Plc|          Poor stock|
|     AIICO| Aiico Insurance Plc|          Poor stock|
|AIRTELAFRI|   Airtel Africa Plc|High performance ...|
|      ALEX|Aluminium Extrusi...|          Poor stock|
|    ARBICO|          Arbico Plc|          Poor stock|
|    ARDOVA|          Ardova Plc|          Poor stock|
|ASOSAVINGS| Aso Savings & Loans|          Poor stock|
| AUSTINLAZ|Austin Laz & Company|          Poor stock|
|     BAPLC|Briclinks Africa Plc|          Poor stock|
|    BERGE

In [21]:
# Dropping columns of a dataframe
transformed_df = transformed_df.drop('Change')
transformed_df.show(3)

+--------+--------------------+-------+-----+------------+
|  Ticker|                Name| Volume|Price|TaxDeduction|
+--------+--------------------+-------+-----+------------+
|ABBEYBDS|Abbey Mortgage Ba...|224,580| 1.52|       1.444|
|ABCTRANS|Associated Bus Co...|370,378| 0.34|       0.323|
| ACADEMY|       Academy Press|  4,760| 1.99|      1.8905|
+--------+--------------------+-------+-----+------------+
only showing top 3 rows



In [22]:
# Rename a column 
transformed_df = transformed_df.withColumnRenamed('Volume', 'StockVolume')
transformed_df.show(3)

+--------+--------------------+-----------+-----+------------+
|  Ticker|                Name|StockVolume|Price|TaxDeduction|
+--------+--------------------+-----------+-----+------------+
|ABBEYBDS|Abbey Mortgage Ba...|    224,580| 1.52|       1.444|
|ABCTRANS|Associated Bus Co...|    370,378| 0.34|       0.323|
| ACADEMY|       Academy Press|      4,760| 1.99|      1.8905|
+--------+--------------------+-----------+-----+------------+
only showing top 3 rows



In [23]:
## Filter dataframe using conditionals
filtered_df = stock_df.filter('Price >= 100')
filtered_df.show(10)

+--------+--------------------+---------+------+------+
|  Ticker|                Name|   Volume| Price|Change|
+--------+--------------------+---------+------+------+
|BUAFOODS|           BUA Foods|  240,781|135.75|   0.0|
| DANGCEM|      Dangote Cement|  256,585|284.00|   0.0|
|  GEREGU|    Geregu Power Plc|   15,499|312.00|   0.0|
|    MTNN|         MTN Nigeria|2,097,876|274.00|   0.0|
|    NESF|Nigeria Energy Se...|     NULL|552.20|  NULL|
|OKOMUOIL|      Okomu Oil Palm|  338,508|236.40|   0.0|
|  PRESCO|          Presco Plc|  245,354|180.00|   0.0|
|   TOTAL|       Total Nigeria|  446,267|336.70|   0.0|
+--------+--------------------+---------+------+------+



In [24]:
## Filtering with the NOT conditional
stock_df.filter(~(stock_df['Price'] >= 100)).show(10)

+----------+--------------------+----------+-----+------+
|    Ticker|                Name|    Volume|Price|Change|
+----------+--------------------+----------+-----+------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580| 1.52| -0.16|
|  ABCTRANS|Associated Bus Co...|   370,378| 0.34| -0.02|
|   ACADEMY|       Academy Press|     4,760| 1.99|   0.0|
|ACCESSCORP| Access Holdings Plc|41,023,827|14.25|   0.0|
| AFRINSURE|African Alliance ...|      NULL| 0.20|  NULL|
|  AFRIPRUD|Africa Prudential...| 1,332,872| 6.40|  0.15|
| AFROMEDIA|       Afromedia Plc|      NULL| 0.20|  NULL|
|     AIICO| Aiico Insurance Plc|19,952,840| 0.74|  0.04|
|      ALEX|Aluminium Extrusi...|      NULL| 6.50|  NULL|
|    ARBICO|          Arbico Plc|      NULL| 1.03|  NULL|
+----------+--------------------+----------+-----+------+
only showing top 10 rows



In [25]:
# Filtering using where and a single condition
stock_df.where(col("Ticker") == "AFROMEDIA").show(2)

+---------+-------------+------+-----+------+
|   Ticker|         Name|Volume|Price|Change|
+---------+-------------+------+-----+------+
|AFROMEDIA|Afromedia Plc|  NULL| 0.20|  NULL|
+---------+-------------+------+-----+------+



In [27]:
# Filtering using where and multiple condition
stock_df.where((col("Price") >= 100) & (col("Price") <= 500)).show()

+--------+----------------+---------+------+------+
|  Ticker|            Name|   Volume| Price|Change|
+--------+----------------+---------+------+------+
|BUAFOODS|       BUA Foods|  240,781|135.75|   0.0|
| DANGCEM|  Dangote Cement|  256,585|284.00|   0.0|
|  GEREGU|Geregu Power Plc|   15,499|312.00|   0.0|
|    MTNN|     MTN Nigeria|2,097,876|274.00|   0.0|
|OKOMUOIL|  Okomu Oil Palm|  338,508|236.40|   0.0|
|  PRESCO|      Presco Plc|  245,354|180.00|   0.0|
|   TOTAL|   Total Nigeria|  446,267|336.70|   0.0|
+--------+----------------+---------+------+------+



In [23]:
# Filtering using where and multiple condition
stock_df.where((col("Price") >= 100) | (col("Price") <= 500)).show()

+----------+--------------------+----------+------+------+
|    Ticker|                Name|    Volume| Price|Change|
+----------+--------------------+----------+------+------+
|  ABBEYBDS|Abbey Mortgage Ba...|   224,580|  1.52| -0.16|
|  ABCTRANS|Associated Bus Co...|   370,378|  0.34| -0.02|
|   ACADEMY|       Academy Press|     4,760|  1.99|   0.0|
|ACCESSCORP| Access Holdings Plc|41,023,827| 14.25|   0.0|
| AFRINSURE|African Alliance ...|      NULL|  0.20|  NULL|
|  AFRIPRUD|Africa Prudential...| 1,332,872|  6.40|  0.15|
| AFROMEDIA|       Afromedia Plc|      NULL|  0.20|  NULL|
|     AIICO| Aiico Insurance Plc|19,952,840|  0.74|  0.04|
|      ALEX|Aluminium Extrusi...|      NULL|  6.50|  NULL|
|    ARBICO|          Arbico Plc|      NULL|  1.03|  NULL|
|    ARDOVA|          Ardova Plc|   536,531| 16.90|   0.0|
|ASOSAVINGS| Aso Savings & Loans|      NULL|  0.50|  NULL|
| AUSTINLAZ|Austin Laz & Company|      NULL|  2.03|  NULL|
|     BAPLC|Briclinks Africa Plc|      NULL|  6.25|  NUL

In [29]:
# Sorting result/records of Dataframe
stock_df.orderBy(col('Price').desc()).show()

+----------+--------------------+----------+------+------+
|    Ticker|                Name|    Volume| Price|Change|
+----------+--------------------+----------+------+------+
| BUACEMENT|      BUA Cement Plc|   402,276| 86.00|   0.0|
|  RONCHESS|Ronchess Global R...|      NULL| 81.00|  NULL|
|UNIONDICON|    Union Dicon Salt|       500|  8.95|   0.0|
|       IMG|Industrial & Medi...|    20,399|  8.40|   0.0|
|    CONOIL|          Conoil Plc|    68,964| 79.00|   0.0|
|  GUINNESS|    Guinness Nigeria|   836,301| 77.70|   0.0|
|   SFSREIT|Skye Shelter Fund...|     5,380| 77.00|   0.0|
|GLAXOSMITH|Glaxo Smithkline ...| 1,227,845|  7.45| -0.05|
|       UBN|  Union Bank Nigeria|   282,283|  7.05| -0.05|
|       MRS|     MRS Oil Nigeria|    73,519| 68.75|   0.0|
|    NOTORE|Notore Chemical I...|      NULL| 62.50|  NULL|
|FIDELITYBK|   Fidelity Bank Plc|31,146,009|  6.94|  0.03|
| CUSTODIAN|Custodian & Allie...|   451,510|  6.70| -0.05|
|      ALEX|Aluminium Extrusi...|      NULL|  6.50|  NUL

In [30]:
# Checking for distinct count of values in a column
stock_df.select('Ticker').distinct().count()

156