FindSpark: This Python library helps locate the Spark installation and configure the environment variables, allowing Python to interact seamlessly with Apache Spark.

PySpark: PySpark is the Python API for Apache Spark, enabling Python applications to leverage Spark's distributed data processing capabilities, and py4j is a dependency that facilitates communication between Python and Java, essential for PySpark to function

In [None]:
!pip install findspark
!pip install pyspark
!pip install py4j


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1
Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=027ea74e489f2ab43f95cbe2ed94a05dcf976084248f3e6650c3ac992799c08a
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


SparkSession: The main entry point for DataFrame and SQL operations in Spark, providing unified APIs for Spark functionalities.
SparkContext: The entry point for RDD operations, used here for backwards compatibility or specific RDD manipulations. For new code, prefer SparkSession.

In [None]:
import findspark
import pyspark
import os

from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

spark=SparkSession.builder.master('local').appName('rdd_demo').getOrCreate()
coff=SparkConf().setMaster('local').setAppName('data')
sc=SparkContext.getOrCreate()


inferschema='false': No automatic type inference; all data will be read as strings.
header='true': The first row of the CSV file is used for column names.

In [None]:
data=spark\
.read\
.option('inferschema','false')\
.option('header','true')\
.csv("2015-summary.csv")

In [None]:
print(data)
data
print(type(data))

DataFrame[DEST_COUNTRY_NAME: string, ORIGIN_COUNTRY_NAME: string, count: string]
<class 'pyspark.sql.dataframe.DataFrame'>


Repartitioning: Repartitioning changes the number of partitions in an RDD and can be useful for optimizing performance based on the size of the data and the resources available

In [None]:
data=spark\
.read\
.option('inferschema','false')\
.option('header','true')\
.csv("2015-summary.csv")
data=data.toDF("dest","source","count").rdd
print("the count of data is: ", data.count())
print(type(data))
print(data.take(1))
data.getNumPartitions()
data=data.repartition(5)
print("the number of partions are: ", data.getNumPartitions())
data=data.repartition(1)
print("the number of partions are: ", data.getNumPartitions())

the count of data is:  256
<class 'pyspark.rdd.RDD'>
[Row(dest='United States', source='Romania', count='15')]
the number of partions are:  5
the number of partions are:  1


In [None]:
import pandas as pd
data3=pd.read_csv('2015-summary.csv')
print(data3.head())
print(type(data3))

data4=spark.createDataFrame(data3).rdd
print(data4.take(4))
print(type(data4))

  DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME  count
0     United States             Romania     15
1     United States             Croatia      1
2     United States             Ireland    344
3             Egypt       United States     15
4     United States               India     62
<class 'pandas.core.frame.DataFrame'>
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15)]
<class 'pyspark.rdd.RDD'>


operation focuses on finding records where the closing price of a stock is greater than the opening price, which might be used to identify positive trading days or analyze stock performance.

In [None]:
ndata=spark.read.format('csv').option('inferschema', 'false').option('header', 'true') \
  .load('RELIANCE.csv')
ndata=ndata.toDF('Date', 'Open', 'High', 'Low', 'Last', 'Close', 'Volume', 'Turnover').rdd \
  .map(lambda row: (row[0], row[1], row[5]))
print(ndata.count())
print(ndata.take(2))


ndata=ndata.filter(lambda row: float(row[2]) > float(row[1]))
print(ndata.count())
print(ndata.take(4))

5
[('2022-01-01', '100', '10000'), ('2022-01-02', '200', '20000')]
5
[('2022-01-01', '100', '10000'), ('2022-01-02', '200', '20000'), ('2022-01-03', '300', '30000'), ('2022-01-04', '400', '40000')]


In [None]:
def pl(line):
  field=line.split(',')
  dat=field[field]
  open=field[1]
  volume=field[4]
  return (dat, open, volume)

mdata=spark.sparkContext.textFile('RELIANCE.csv')
mdata=mdata.map(pl)

HighClose checks if the 'Close' price is greater than the 'Open' price and returns a Boolean,
CSV file is read into an RDD, and columns are selected
Counts and prints initial rows, applies filtering, and prints filtered results

In [None]:
def HighClose(row):
    if float(row[2]) > float(row[1]):
        return row

o_sdt = spark.read.format("CSV").option("header", "true").load('RELIANCE.csv')
o_sdt = o_sdt.toDF("Date", "Open", "High", "Low", "Last", "Close", "Volume", "Turnover").rdd \
    .map(lambda row: (row[0], row[1], row[5]))
print(o_sdt.count())
o_sdt = o_sdt.filter(lambda row: HighClose(row))
print(o_sdt.take(5))
print(type(o_sdt))
print(o_sdt.count())


5
[('2022-01-01', '100', '10000'), ('2022-01-02', '200', '20000'), ('2022-01-03', '300', '30000'), ('2022-01-04', '400', '40000'), ('2022-01-05', '500', '50000')]
<class 'pyspark.rdd.PipelinedRDD'>
5


In [None]:
def to_mil(rwo):
  return (row[0], row[1], rwo[5], round(row[3], 0))

kdata=spark.read.format('csv').option('inferschema','false').option('header','true').load('RELIANCE.csv')
kdata=kdata.toDF("Date", "Open", "High", "Low", "Last", "Close", "Volume", "Turnover").rdd.map(lambda row: (row[0], row[1], row[5]))

print(kdata.count())
kdata=kdata.map(to_mil)
print(type(kdata))

5
<class 'pyspark.rdd.PipelinedRDD'>


In [None]:
def chan(lines):
  lines=lines.lower()
  lines=lines.split(' ')
  return lines
s='sherlock_holmes.txt'
input_file=sc.textFile(s, 6)

print(input_file.take(1))
sdatas=input_file.flatMap(chan)
print(sdatas.take(6))

sdatan=input_file.map(chan)
print(sdatan.take(1))

['A SCANDAL IN BOHEMIA']
['a', 'scandal', 'in', 'bohemia', 'i.', 'to']
[['a', 'scandal', 'in', 'bohemia']]


In [None]:
sdatas.count()

11427

In [None]:
sdatas.countApprox(1,0.9)

11427

In [None]:
sdatas.countByValue()

defaultdict(int,
            {'a': 288,
             'scandal': 4,
             'in': 188,
             'bohemia': 2,
             'i.': 2,
             'to': 299,
             'sherlock': 14,
             'holmes': 27,
             'she': 67,
             'is': 114,
             'always': 7,
             'the': 556,
             'woman.': 3,
             'i': 280,
             'have': 88,
             'seldom': 3,
             'heard': 11,
             'him': 16,
             'mention': 1,
             'her': 40,
             'under': 10,
             'any': 10,
             'other': 12,
             'name.': 1,
             'his': 137,
             'eyes': 9,
             'eclipses': 1,
             'and': 305,
             'predominates': 1,
             'whole': 6,
             'of': 302,
             'sex.': 1,
             'it': 137,
             'was': 140,
             'not': 73,
             'that': 137,
             'he': 129,
             'felt': 2,
             'emotion': 2

In [None]:
print(type(input_file))
print(input_file.count())
print(input_file.getNumPartitions())
print(sc.defaultParallelism)

<class 'pyspark.rdd.RDD'>
636
6
1


Save the file

In [None]:
sdatas.saveAsTextFile('w3.txt')

Flat Map

In [None]:
print(sdatas.flatMap(lambda x: (x,1)).take(5))
sdatas.map(lambda x: (x,1)).take(5)

['a', 1, 'scandal', 1, 'in']


[('a', 1), ('scandal', 1), ('in', 1), ('bohemia', 1), ('i.', 1)]

In [None]:
rdd2=sdatas.map(lambda x: (x,1)).groupByKey().mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

rdd2.take(5)

[(556, 'the'), (321, ''), (305, 'and'), (302, 'of'), (299, 'to')]

In [None]:
rrd1=sdatas.map(lambda x: (x,1)).groupByKey().mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rrd1.take(5)


[(556, 'the'), (321, ''), (305, 'and'), (302, 'of'), (299, 'to')]

In [None]:
spark.sparkContext.parallelize(range(1,5)).reduce(lambda x, y: x+y)

10

In [None]:
myCollection = "Spark The Definitive Guide : Big Data Processing Made Simple"\
  .split(" ")
words = spark.sparkContext.parallelize(myCollection, 2)
words.take(5)

['Spark', 'The', 'Definitive', 'Guide', ':']

In [None]:
### using keyBy()
keyword=words.keyBy(lambda word: hash(word))
keyword.take(4)

[(-7604683196601698838, 'Spark'),
 (1381986379548814910, 'The'),
 (-4122473907452972367, 'Definitive'),
 (6601866536240625759, 'Guide')]

In [None]:
## using mapValues
keyword.mapValues(lambda x: x.upper()).collect()

[(-7604683196601698838, 'SPARK'),
 (1381986379548814910, 'THE'),
 (-4122473907452972367, 'DEFINITIVE'),
 (6601866536240625759, 'GUIDE'),
 (1827294539281609183, ':'),
 (-7752043373178716433, 'BIG'),
 (-151788679466617941, 'DATA'),
 (4204150861945194593, 'PROCESSING'),
 (146797214411928478, 'MADE'),
 (-4607150138042100501, 'SIMPLE')]

In [None]:
## extracting keys and/or values
print(keyword.keys().collect())
print(keyword.values().collect())

[-7604683196601698838, 1381986379548814910, -4122473907452972367, 6601866536240625759, 1827294539281609183, -7752043373178716433, -151788679466617941, 4204150861945194593, 146797214411928478, -4607150138042100501]
['Spark', 'The', 'Definitive', 'Guide', ':', 'Big', 'Data', 'Processing', 'Made', 'Simple']


In [None]:
def addFunc(left, right):
    return(left + right)

def maxFunc(left, right):
    max(left, right)

In [None]:
spth_2="sherlock_holmes.txt"
input_file = sc.textFile(spth_2,6)
chars=input_file.flatMap(lambda word: word.split(" "))
KVcharacters=chars.map(lambda letter: (letter,1))
KVcharacters.countByKey()

defaultdict(int,
            {'A': 12,
             'SCANDAL': 1,
             'IN': 1,
             'BOHEMIA': 1,
             'I.': 2,
             'To': 4,
             'Sherlock': 13,
             'Holmes': 27,
             'she': 54,
             'is': 113,
             'always': 7,
             'the': 525,
             'woman.': 3,
             'I': 280,
             'have': 88,
             'seldom': 2,
             'heard': 11,
             'him': 16,
             'mention': 1,
             'her': 40,
             'under': 10,
             'any': 10,
             'other': 12,
             'name.': 1,
             'In': 6,
             'his': 131,
             'eyes': 9,
             'eclipses': 1,
             'and': 289,
             'predominates': 1,
             'whole': 6,
             'of': 301,
             'sex.': 1,
             'It': 37,
             'was': 139,
             'not': 73,
             'that': 131,
             'he': 101,
             'felt': 2,
         

In [None]:
KVcharacters.reduceByKey(lambda x,y: x+y).collect()

[('BOHEMIA', 1),
 ('always', 7),
 ('have', 88),
 ('heard', 11),
 ('of', 301),
 ('It', 37),
 ('love', 6),
 ('machine', 1),
 ('as', 79),
 ('placed', 1),
 ('spoke', 3),
 ('save', 4),
 ('admirable', 2),
 ('drawing', 1),
 ('But', 7),
 ('intrusions', 1),
 ('into', 37),
 ('distracting', 1),
 ('high-power', 1),
 ('there', 28),
 ('', 321),
 ('seen', 6),
 ('away', 7),
 ('finds', 1),
 ('master', 1),
 ('form', 1),
 ('Baker', 6),
 ('books,', 1),
 ('cocaine', 1),
 ('nature.', 1),
 ('observation', 1),
 ('out', 25),
 ('clues,', 1),
 ('official', 1),
 ('account', 4),
 ('murder,', 1),
 ('Atkinson', 1),
 ('Trincomalee,', 1),
 ('family', 1),
 ('signs', 1),
 ('One', 1),
 ('twentieth', 1),
 ('when', 31),
 ('led', 2),
 ('must', 19),
 ('mind', 6),
 ('dark', 3),
 ('know', 18),
 ('even', 6),
 ('head', 3),
 ('told', 5),
 ('new', 3),
 ('own.', 2),
 ('was;', 1),
 ('eye,', 1),
 ('spirit', 1),
 ('before', 13),
 ('“Wedlock', 1),
 ('you,”', 1),
 ('you.”', 7),
 ('answered.', 3),
 ('know?”', 1),
 ('very', 37),
 ('girl?”

In [None]:
### inner joins
x = sc.parallelize([("a", 1), ("b", 4), ("c",5)])
y = sc.parallelize([("a", 2), ("a", 3)])
sorted(x.join(y).collect())

[('a', (1, 2)), ('a', (1, 3))]

In [None]:
## zips
y = sc.parallelize(zip(range(0,5), range(10,25)))
y.take(10)

[(0, 10), (1, 11), (2, 12), (3, 13), (4, 14)]

Broadcast Variables:Efficiently share immutable data across all nodes in a Spark cluster. Broadcast variables are sent to each worker node once and then cached there.Reduces network and serialization overhead by caching the variable on worker nodes

In [None]:
my_collection = "Spark The Definitive Guide : Big Data \
Processing Made Simple".split(" ")
words = spark.sparkContext.parallelize(my_collection, 2)

supplementalData = {"Spark":1000, \
                    "Definitive":200,\
                    "Big":-300, \
                    "Simple":100, \
                    "Algebra": -1000
                   }

suppBroadcast = spark.sparkContext.broadcast(supplementalData)

print(suppBroadcast.value)

words.map(lambda word: (word, suppBroadcast.value.get(word, 0))) \
    .sortBy(lambda wordPair: wordPair[1]).collect()

{'Spark': 1000, 'Definitive': 200, 'Big': -300, 'Simple': 100, 'Algebra': -1000}


[('Big', -300),
 ('The', 0),
 ('Guide', 0),
 (':', 0),
 ('Data', 0),
 ('Processing', 0),
 ('Made', 0),
 ('Simple', 100),
 ('Definitive', 200),
 ('Spark', 1000)]

Surrogate Key creation in real applications:
Start with an empty lookup (Map/dictionary, ie key value pairs)
lets say you wanted a lookup on mobile numbers
Start processing your input one row at a time (this can \
   be parallelised as well)

for every incoming mobile num check if it exists in your lookup
if it exists, fetch the key
if it doesnt:
    1. generate a new unique value as key for that incoming \
        mobile number
    2. store the newly generated mob. num, unique key pair in \
        lookup
    3. return the newly generated unique key to calling function

Now your lookup has one row
Go on repeating all above steps

In [None]:
#from pyspark import SparkContext
#sc = SparkContext("local", "Accumulator app")
num = sc.accumulator(10)
def f(x):
    global num
    num += x

rdd = sc.parallelize([20,30,40,50])
rdd.foreach(f)
final = num.value
print ("Accumulated value is -> %i" % (final))

Accumulated value is -> 150


# DataFrame

Data Frames are immutable and once created can't be modified.Have structure like a table and contains rows and columns. Data Framework runs on the Spark SQL Context and provides SQL like queries for querying data. DataFrames support data from many different sources including Hive tables, Structured Data files, external databases, or existing RDDs. It is highly influenced with the designing principles of Data Frames in R Programming and Pandas in
Python.

In [None]:
dirty_data = spark.createDataFrame([
          (1,'Porsche','Boxster S','Turbo',2.5,4,22,None)
        , (2,'Aston Martin','Vanquish','Aspirated',6.0,12,16,None)
        , (3,'Porsche','911 Carrera 4S Cabriolet','Turbo',3.0,6,24,None)
        , (3,'General Motors','SPARK ACTIV','Aspirated',1.4,None,32,None)
        , (5,'BMW','COOPER S HARDTOP 2 DOOR','Turbo',2.0,4,26,None)
        , (6,'BMW','330i','Turbo',2.0,None,27,None)
        , (7,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (8,'BMW','440i Coupe','Turbo',3.0,6,23,None)
        , (9,'Mercedes-Benz',None,None,None,None,27,None)
        , (10,'Mercedes-Benz','CLS 550','Turbo',4.7,8,21,79231)
        , (11,'Volkswagen','GTI','Turbo',2.0,4,None,None)
        , (12,'Ford Motor Company','FUSION AWD','Turbo',2.7,6,20,None)
        , (13,'Nissan','Q50 AWD RED SPORT','Turbo',3.0,6,22,None)
        , (14,'Nissan','Q70 AWD','Aspirated',5.6,8,18,None)
        , (15,'Kia','Stinger RWD','Turbo',2.0,4,25,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (16,'Toyota','CAMRY HYBRID LE','Aspirated',2.5,4,46,None)
        , (18,'FCA US LLC','300','Aspirated',3.6,6,23,None)
        , (19,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (20,'Hyundai','G80 AWD','Turbo',3.3,6,20,None)
        , (21,'BMW','X5 M','Turbo',4.4,8,18,121231)
        , (22,'GE','K1500 SUBURBAN 4WD','Aspirated',5.3,8,18,None)
    ], ['Id','Manufacturer','Model','EngineType','Displacement',
        'Cylinders','FuelEconomy','MSRP'])

distinct() -- method in Apache Spark is used to remove duplicate elements from an RDD or DataFrame

In [None]:
# no'r duplicated rows
dirty_data.count(), dirty_data.distinct().count()

(22, 21)

In [None]:
#duplicated row
(
    dirty_data
    .groupby(dirty_data.columns)
    .count()
    .filter('count > 1')
    .show()
)

+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| Id|Manufacturer|          Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+
| 16|      Toyota|CAMRY HYBRID LE| Aspirated|         2.5|        4|         46|NULL|    2|
+---+------------+---------------+----------+------------+---------+-----------+----+-----+



In [None]:
# remove the duplicated rows (the original data don't change)
full_removed = dirty_data.dropDuplicates()
full_removed.count()

21

In [None]:
# count of rows
no_ids = (
    full_removed
    .select([col for col in full_removed.columns if col != 'Id'])
)

no_ids.count(), no_ids.distinct().count()

(21, 19)

In [None]:
# duplicated row
(
    full_removed
    .groupby([col for col in full_removed.columns if col != 'Id'])
    .count()
    .filter('count > 1')
    .show()
)

+------------+----------+----------+------------+---------+-----------+----+-----+
|Manufacturer|     Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|count|
+------------+----------+----------+------------+---------+-----------+----+-----+
|         BMW|440i Coupe|     Turbo|         3.0|        6|         23|NULL|    2|
|     Hyundai|   G80 AWD|     Turbo|         3.3|        6|         20|NULL|    2|
+------------+----------+----------+------------+---------+-----------+----+-----+



In [None]:
# removeing the duplicated record
id_removed = full_removed.dropDuplicates(
    subset = [col for col in full_removed.columns if col != 'Id']
)
# count
id_removed.count()

19

In [None]:
# duplicated IDs
import pyspark.sql.functions as fn

id_removed.agg(
      fn.count('Id').alias('CountOfIDs')
    , fn.countDistinct('Id').alias('CountOfDistinctIDs')
).show()

+----------+------------------+
|CountOfIDs|CountOfDistinctIDs|
+----------+------------------+
|        19|                18|
+----------+------------------+



In [None]:
# duplicated Id
(
    id_removed
    .groupby('Id')
    .count()
    .filter('count > 1')
    .show()
)

+---+-----+
| Id|count|
+---+-----+
|  3|    2|
+---+-----+



In [None]:
(
    id_removed
    .filter('Id = 3')
    .show()
)

+---+--------------+--------------------+----------+------------+---------+-----------+----+
| Id|  Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+--------------+--------------------+----------+------------+---------+-----------+----+
|  3|General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|NULL|
|  3|       Porsche|911 Carrera 4S Ca...|     Turbo|         3.0|        6|         24|NULL|
+---+--------------+--------------------+----------+------------+---------+-----------+----+



In [None]:
new_id = (
    id_removed
    .select(
        [fn.monotonically_increasing_id().alias('Id')] +
        [col for col in id_removed.columns if col != 'Id']
    )
)

new_id.show()

+---+------------------+--------------------+----------+------------+---------+-----------+------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|  MSRP|
+---+------------------+--------------------+----------+------------+---------+-----------+------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21| 79231|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|121231|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|  NULL|
|  3|     Mercedes-Benz|                NULL|      NULL|        NULL|     NULL|         27|  NULL|
|  4|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        6|         20|  NULL|
|  5|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|  NULL|
|  6|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|  NULL|
|  7|     

In [None]:
(
    spark.createDataFrame(
        new_id.rdd.map(
           lambda row: (
                 row['Id']
               , sum([c == None for c in row])
           )
        )
        .filter(lambda el: el[1] > 1)
        .collect()
        ,['Id', 'CountMissing']
    )
    .orderBy('CountMissing', ascending=False)
    .show()
)

+---+------------+
| Id|CountMissing|
+---+------------+
|  3|           5|
|  2|           2|
|  7|           2|
|  9|           2|
+---+------------+



In [None]:
(
    new_id
    .where('Id == 3')
    .show()
)

+---+-------------+-----+----------+------------+---------+-----------+----+
| Id| Manufacturer|Model|EngineType|Displacement|Cylinders|FuelEconomy|MSRP|
+---+-------------+-----+----------+------------+---------+-----------+----+
|  3|Mercedes-Benz| NULL|      NULL|        NULL|     NULL|         27|NULL|
+---+-------------+-----+----------+------------+---------+-----------+----+



In [None]:
merc_out = new_id.dropna(thresh=4)
new_id.count(), merc_out.count()

(19, 18)

'''sort merc_out
aggregate sorted merc_out
loop for all columns
calculate missing values per column
divide that by total number of rows
gives us the ratio of available values for that column
substract that from 1
gives us the ratio of missing values for that column
rename that column as <column>_miss
after the loop is over, finally the higher the ratio, more tha missing values in a colum
collect the output
convert it to a dictionary
extract the dictionary items
fetch the second column
sort '''

In [None]:

for k, v in sorted(
    merc_out.agg(*[
               (1 - (fn.count(c) / fn.count('*'))).alias(c + '_miss')
               for c in merc_out.columns
           ])
        .collect()[0]
        .asDict()
        .items()
    , key=lambda el: el[1]
    , reverse=True
):
    print(k, v)

MSRP_miss 0.8888888888888888
Cylinders_miss 0.11111111111111116
FuelEconomy_miss 0.05555555555555558
Id_miss 0.0
Manufacturer_miss 0.0
Model_miss 0.0
EngineType_miss 0.0
Displacement_miss 0.0


In [None]:
no_MSRP = merc_out.select([col for col in new_id.columns if col != 'MSRP'])
no_MSRP.show()

+---+------------------+--------------------+----------+------------+---------+-----------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+-----------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|         21|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|         18|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|     NULL|         32|
|  4|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        6|         20|
|  5|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|         26|
|  6|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|         16|
|  7|        Volkswagen|                 GTI|     Turbo|         2.0|        4|       NULL|
|  8|               Kia|         Stinger RWD|     Turbo|         2.0|        4| 

Missing Observations

In [None]:

multipliers = (
    no_MSRP
    .agg(
          fn.mean(
              fn.col('FuelEconomy') /
              (
                  fn.col('Displacement') * fn.col('Cylinders')
              )
          ).alias('FuelEconomy')
        , fn.mean(
            fn.col('Cylinders') /
            fn.col('Displacement')
        ).alias('Cylinders')
    )
).toPandas().to_dict('records')[0]

multipliers

{'FuelEconomy': 1.4957485048359973, 'Cylinders': 1.8353365984789107}

In [None]:
cylinders=5
disp=2
calc=cylinders/disp
print("calc: ", calc)

print("new cylinder value: ", int(calc * disp))

calc:  2.5
new cylinder value:  5


In [None]:
cylinders=None
cylinders=calc
disp=3
calc=cylinders/disp
print("calc: ", calc)

print("new cylinder value: ", int(calc * disp))

calc:  0.8333333333333334
new cylinder value:  2


In [None]:
cylinders=calc
print("new cylinder value: ", cylinders)

new cylinder value:  0.8333333333333334


In [None]:
imputed = (
    no_MSRP
    .withColumn('FuelEconomy', fn.col('FuelEconomy')   / fn.col('Displacement') / fn.col('Cylinders'))
    .withColumn('Cylinders',   fn.col('Cylinders')   / fn.col('Displacement'))
    .fillna(multipliers)
    .withColumn('Cylinders',   (fn.col('Cylinders')   * fn.col('Displacement')).cast('integer'))
    .withColumn('FuelEconomy', fn.col('FuelEconomy') * fn.col('Displacement') * fn.col('Cylinders'))
)

imputed.show()

+---+------------------+--------------------+----------+------------+---------+------------------+
| Id|      Manufacturer|               Model|EngineType|Displacement|Cylinders|       FuelEconomy|
+---+------------------+--------------------+----------+------------+---------+------------------+
|  0|     Mercedes-Benz|             CLS 550|     Turbo|         4.7|        8|              21.0|
|  1|               BMW|                X5 M|     Turbo|         4.4|        8|              18.0|
|  2|    General Motors|         SPARK ACTIV| Aspirated|         1.4|        2| 4.188095813540793|
|  4|Ford Motor Company|          FUSION AWD|     Turbo|         2.7|        5|16.666666666666668|
|  5|               BMW|COOPER S HARDTOP ...|     Turbo|         2.0|        4|              26.0|
|  6|      Aston Martin|            Vanquish| Aspirated|         6.0|       12|              16.0|
|  7|        Volkswagen|                 GTI|     Turbo|         2.0|        4|11.965988038687978|
|  8|     

Identifying Outliers:Handling outliers in Spark involves identifying, analyzing, and possibly removing or transforming data points that deviate significantly from the rest of the dataset.

Z-Score: Measures how many standard deviations an element is from the mean. Typically, a Z-score beyond ±3 indicates an outlier.

Interquartile Range (IQR): Measures the spread between the 25th (Q1) and 75th (Q3) percentiles. Outliers are usually considered as values outside
𝑄
1
−
1.5
×
𝐼
𝑄
𝑅
Q1−1.5×IQR and
𝑄
3
+
1.5
×
𝐼
𝑄
𝑅
Q3+1.5×IQR.
Visualization:

Box Plots: Helpful in visualizing the spread and identifying potential outliers.

In [None]:
followers=[10,20,21,22,40,9,11,99,0.5,11,20,35]
[0.5,9,10, 11,11,20,20,21,22,35,40,99]
q1=[0.5]
q2=[11]
q3=[20]
q4=[35]

In [None]:
import numpy as np

In [None]:
followers_minus_top_outlier=[10,20,21,22,40,9,11,0.5,11,20,35]
followers_minus_all_outliers=[10,20,21,22,40,9,11,11,20,35]
features = ['Displacement', 'Cylinders', 'FuelEconomy']
quantiles = [0.25, 0.75]

cut_off_points = []

for feature in features:
    quants = imputed.approxQuantile(feature, quantiles, 0.05)

    IQR = quants[1] - quants[0]
    cut_off_points.append((feature, [
        quants[0] - 1.5 * IQR,
        quants[1] + 1.5 * IQR,
    ]))

cut_off_points = dict(cut_off_points)

outliers = imputed.select(*['id'] + [
       (
           (imputed[f] < cut_off_points[f][0]) |
           (imputed[f] > cut_off_points[f][1])
       ).alias(f + '_o') for f in features
  ])
outliers.show()

+---+--------------+-----------+-------------+
| id|Displacement_o|Cylinders_o|FuelEconomy_o|
+---+--------------+-----------+-------------+
|  0|         false|      false|        false|
|  1|         false|      false|        false|
|  2|         false|      false|         true|
|  4|         false|      false|        false|
|  5|         false|      false|        false|
|  6|         false|       true|        false|
|  7|         false|      false|        false|
|  8|         false|      false|        false|
|  9|         false|      false|        false|
| 10|         false|      false|        false|
| 11|         false|      false|        false|
| 12|         false|      false|        false|
| 13|         false|      false|        false|
| 14|         false|      false|        false|
| 15|         false|      false|        false|
| 16|         false|      false|        false|
| 17|         false|      false|         true|
| 18|         false|      false|        false|
+---+--------

In [None]:
with_outliers_flag = imputed.join(outliers, on='Id')

(
    with_outliers_flag
    .filter('FuelEconomy_o')
    .select('Id', 'Manufacturer', 'Model', 'FuelEconomy')
    .show()
)

+---+--------------+---------------+-----------------+
| Id|  Manufacturer|          Model|      FuelEconomy|
+---+--------------+---------------+-----------------+
|  2|General Motors|    SPARK ACTIV|4.188095813540793|
| 17|        Toyota|CAMRY HYBRID LE|             46.0|
+---+--------------+---------------+-----------------+



In [None]:
no_outliers = (
    with_outliers_flag
    .filter('!FuelEconomy_o')
    .select(imputed.columns)
)

Descriptive Statistics

In [None]:
descriptive_stats = no_outliers.describe(features)
descriptive_stats.show()

+-------+------------------+-----------------+------------------+
|summary|      Displacement|        Cylinders|       FuelEconomy|
+-------+------------------+-----------------+------------------+
|  count|                16|               16|                16|
|   mean|           3.44375|            6.125|19.600446608398165|
| stddev|1.3549753995306828|2.276693508870558| 4.666647767373752|
|    min|               2.0|                3| 8.974491029015983|
|    max|               6.0|               12|              26.0|
+-------+------------------+-----------------+------------------+



In [None]:
descriptive_stats_all = no_outliers.describe()
descriptive_stats_all.show()

+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|summary|               Id|Manufacturer|Model|EngineType|      Displacement|        Cylinders|       FuelEconomy|
+-------+-----------------+------------+-----+----------+------------------+-----------------+------------------+
|  count|               16|          16|   16|        16|                16|               16|                16|
|   mean|           9.3125|        NULL|300.0|      NULL|           3.44375|            6.125|19.600446608398165|
| stddev|5.287958017987662|        NULL| NULL|      NULL|1.3549753995306828|2.276693508870558| 4.666647767373752|
|    min|                0|Aston Martin|  300| Aspirated|               2.0|                3| 8.974491029015983|
|    max|               18|  Volkswagen| X5 M|     Turbo|               6.0|               12|              26.0|
+-------+-----------------+------------+-----+----------+------------------+------------

In [None]:
(
    no_outliers
    .select(features)
    .groupBy('Cylinders')
    .agg(*[
          fn.count('*').alias('Count')
        , fn.mean('FuelEconomy').alias('MPG_avg')
        , fn.mean('Displacement').alias('Disp_avg')
        , fn.stddev('FuelEconomy').alias('MPG_stdev')
        , fn.stddev('Displacement').alias('Disp_stdev')
    ])
    .orderBy('Cylinders')
).show()

+---------+-----+------------------+--------+------------------+-------------------+
|Cylinders|Count|           MPG_avg|Disp_avg|         MPG_stdev|         Disp_stdev|
+---------+-----+------------------+--------+------------------+-------------------+
|        3|    1| 8.974491029015983|     2.0|              NULL|               NULL|
|        4|    4|21.241497009671995|   2.125| 6.413009924998989|               0.25|
|        5|    1|16.666666666666668|     2.7|              NULL|               NULL|
|        6|    5|              22.4|    3.18|1.5165750888103107|0.26832815729997495|
|        8|    4|             18.75|     5.0|               1.5| 0.5477225575051655|
|       12|    1|              16.0|     6.0|              NULL|               NULL|
+---------+-----+------------------+--------+------------------+-------------------+



Computing Correlation:


In [None]:
(
    no_outliers
    .corr('Cylinders', 'Displacement')
)

0.9381829964408112

In [None]:
n_features = len(features)

corr = []

for i in range(0, n_features):
    temp = [None] * i

    for j in range(i, n_features):
        temp.append(no_outliers.corr(features[i], features[j]))
    corr.append([features[i]] + temp)

correlations = spark.createDataFrame(corr, ['Column'] + features)

correlations.show()

+------------+------------+------------------+--------------------+
|      Column|Displacement|         Cylinders|         FuelEconomy|
+------------+------------+------------------+--------------------+
|Displacement|         1.0|0.9381829964408112|-0.10757908872387642|
|   Cylinders|        NULL|               1.0|-0.04218546545035...|
| FuelEconomy|        NULL|              NULL|                 1.0|
+------------+------------+------------------+--------------------+



# MLib and Cluster Deployment

In [None]:
from pyspark import SparkConf, SparkContext
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("sparksql").getOrCreate()

#sc.stop()
conf = SparkConf().setMaster("local").setAppName("sparksql")
sc = SparkContext.getOrCreate()

In [None]:
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("2015-summary.csv")

In [None]:
flightData2015.take(1)

[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15)]

In [None]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [None]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(*)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")


dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

sqlWay.explain()
dataFrameWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#173], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#173, 200), ENSURE_REQUIREMENTS, [plan_id=841]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#173], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#173] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/2015-summary.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[DEST_COUNTRY_NAME#265], functions=[count(1)])
   +- Exchange hashpartitioning(DEST_COUNTRY_NAME#265, 200), ENSURE_REQUIREMENTS, [plan_id=854]
      +- HashAggregate(keys=[DEST_COUNTRY_NAME#265], functions=[partial_count(1)])
         +- FileScan csv [DEST_COUNTRY_NAME#265] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/c

In [None]:
sqlWay.count()

132

In [None]:
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()

dataFrameWay.take(1)

[Row(DEST_COUNTRY_NAME='Anguilla', count=1)]

In [None]:
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.take(5)

[Row(DEST_COUNTRY_NAME='United States', destination_total=411352),
 Row(DEST_COUNTRY_NAME='Canada', destination_total=8399),
 Row(DEST_COUNTRY_NAME='Mexico', destination_total=7140),
 Row(DEST_COUNTRY_NAME='United Kingdom', destination_total=2025),
 Row(DEST_COUNTRY_NAME='Japan', destination_total=1548)]

In [None]:
sqlWay = spark.sql("""
CREATE TABLE flights_11_new (
DEST_COUNTRY_NAME STRING,
ORIGIN_COUNTRY_NAME STRING,
count LONG)
USING JSON OPTIONS (path '2015-summary.json')
"""
)

In [None]:
sqlWay.createGlobalTempView('flights_11_new')


In [None]:
sqlWay=spark.sql("""
    SELECT * FROM flights_11_new
    """)
sqlWay.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
+-----------------+-------------------+-----+



In [None]:
countsql=spark.sql("SELECT * FROM flights_2_new WHERE count BETWEEN 40 AND 100")
countsql.show()

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
+-----------------+-------------------+-----+



In [None]:
dataFrameWay.write.bucketBy(42, "DEST_COUNTRY_NAME").sortBy("count").saveAsTable("flight_in_table_22")

In [None]:
tabsqlWay = spark.sql("""
SELECT *
FROM flight_in_table_22
""")
tabsqlWay.show()

+--------------------+-----+
|   DEST_COUNTRY_NAME|count|
+--------------------+-----+
|Saint Vincent and...|    1|
|Turks and Caicos ...|    1|
|            Dominica|    1|
|               Ghana|    1|
|          Martinique|    1|
|Federated States ...|    1|
|            Bulgaria|    1|
|              Taiwan|    1|
|       Cote d'Ivoire|    1|
| Trinidad and Tobago|    1|
|              Poland|    1|
|            Pakistan|    1|
|            Paraguay|    1|
|              France|    1|
|             Algeria|    1|
|    Saint Barthelemy|    1|
|            Ethiopia|    1|
|        Burkina Faso|    1|
|             Bahrain|    1|
|               Italy|    1|
+--------------------+-----+
only showing top 20 rows

