In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
import findspark
findspark.init()

In [4]:
spark = SparkSession\
              .builder\
              .appName( "pyspark_training" )\
              .config( "spark.driver.host", "localhost" )\
              .getOrCreate()
              

In [5]:
 spark

In [6]:
spark.getActiveSession()

In [7]:
spark.version

'3.2.0'

In [8]:
spark.sparkContext.applicationId

'local-1679037373500'

In [9]:
sc = spark.sparkContext

In [10]:
sc.startTime

1679037371798

In [11]:
sc.setLogLevel('INFO')

In [12]:
sc.uiWebUrl

'http://LAPTOP-9BMJ89EQ:4040'

In [13]:
import os 

In [14]:
os.environ.get('HADOOP_HOME')

'C:\\Hadoop'

# Create an RDD using range

In [15]:
list(range(11,20,2))

[11, 13, 15, 17, 19]

In [16]:
rdd_range = sc.parallelize(range(20))

In [17]:
rdd_range.count()

20

In [18]:
rdd_range.collect()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

# Create an RDD using list

In [19]:
details = [('Adam',26),('Jack',25),('Rachel',29)]

In [20]:
rdd_details = sc.parallelize(details)

In [21]:
type(rdd_details)

pyspark.rdd.RDD

# Count the number of rows

In [22]:
details_count = rdd_details.count()

In [23]:
details_count

3

# Fetch the first row

In [24]:
details_first = rdd_details.first()

In [25]:
details_first

('Adam', 26)

In [26]:
details_first[0]

'Adam'

# Collect should be avoided in case of Huge Datasets - When a collect operation is issued on a RDD, the dataset is copied to the driver, i.e. the master node. A memory exception will be thrown if the dataset is too large to fit in memory

In [27]:
rdd_details.collect()

[('Adam', 26), ('Jack', 25), ('Rachel', 29)]

In [28]:
for i in rdd_details.collect():
    print(i[0]+'-'+str(i[1]))

Adam-26
Jack-25
Rachel-29


In [29]:
Name = [i[0] for i in rdd_details.collect()]

In [30]:
Name

['Adam', 'Jack', 'Rachel']

# get n rows

In [31]:
rdd_details.take(2)

[('Adam', 26), ('Jack', 25)]

# word count

In [32]:
text = 'Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming'

In [33]:
text

'Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming'

In [34]:
rdd = sc.parallelize([text])

In [35]:
rdd.collect()

['Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. It also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming']

In [36]:
def row_split(x):
    return x.split(" ")

In [37]:
row_split(text)

['Apache',
 'Spark',
 'is',
 'a',
 'fast',
 'and',
 'general-purpose',
 'cluster',
 'computing',
 'system.',
 'It',
 'provides',
 'high-level',
 'APIs',
 'in',
 'Java,',
 'Scala,',
 'Python',
 'and',
 'R,',
 'and',
 'an',
 'optimized',
 'engine',
 'that',
 'supports',
 'general',
 'execution',
 'graphs.',
 'It',
 'also',
 'supports',
 'a',
 'rich',
 'set',
 'of',
 'higher-level',
 'tools',
 'including',
 'Spark',
 'SQL',
 'for',
 'SQL',
 'and',
 'structured',
 'data',
 'processing,',
 'MLlib',
 'for',
 'machine',
 'learning,',
 'GraphX',
 'for',
 'graph',
 'processing,',
 'and',
 'Spark',
 'Streaming']

In [38]:
l = rdd.map(lambda x:x.split(' ')).collect()
l

[['Apache',
  'Spark',
  'is',
  'a',
  'fast',
  'and',
  'general-purpose',
  'cluster',
  'computing',
  'system.',
  'It',
  'provides',
  'high-level',
  'APIs',
  'in',
  'Java,',
  'Scala,',
  'Python',
  'and',
  'R,',
  'and',
  'an',
  'optimized',
  'engine',
  'that',
  'supports',
  'general',
  'execution',
  'graphs.',
  'It',
  'also',
  'supports',
  'a',
  'rich',
  'set',
  'of',
  'higher-level',
  'tools',
  'including',
  'Spark',
  'SQL',
  'for',
  'SQL',
  'and',
  'structured',
  'data',
  'processing,',
  'MLlib',
  'for',
  'machine',
  'learning,',
  'GraphX',
  'for',
  'graph',
  'processing,',
  'and',
  'Spark',
  'Streaming']]

In [39]:
len(l[0])

58

In [40]:
flatten_rdd = rdd.map(lambda x:x.split(' ')).flatMap(lambda x:x)

In [41]:
flatten_rdd.collect()

['Apache',
 'Spark',
 'is',
 'a',
 'fast',
 'and',
 'general-purpose',
 'cluster',
 'computing',
 'system.',
 'It',
 'provides',
 'high-level',
 'APIs',
 'in',
 'Java,',
 'Scala,',
 'Python',
 'and',
 'R,',
 'and',
 'an',
 'optimized',
 'engine',
 'that',
 'supports',
 'general',
 'execution',
 'graphs.',
 'It',
 'also',
 'supports',
 'a',
 'rich',
 'set',
 'of',
 'higher-level',
 'tools',
 'including',
 'Spark',
 'SQL',
 'for',
 'SQL',
 'and',
 'structured',
 'data',
 'processing,',
 'MLlib',
 'for',
 'machine',
 'learning,',
 'GraphX',
 'for',
 'graph',
 'processing,',
 'and',
 'Spark',
 'Streaming']

In [42]:
flatten_rdd.count()

58

# Assign 1 to each word and use reduceByKey to sum each occurance of a word

In [43]:
flatten_rdd.map(lambda x:(x,1)).collect()

[('Apache', 1),
 ('Spark', 1),
 ('is', 1),
 ('a', 1),
 ('fast', 1),
 ('and', 1),
 ('general-purpose', 1),
 ('cluster', 1),
 ('computing', 1),
 ('system.', 1),
 ('It', 1),
 ('provides', 1),
 ('high-level', 1),
 ('APIs', 1),
 ('in', 1),
 ('Java,', 1),
 ('Scala,', 1),
 ('Python', 1),
 ('and', 1),
 ('R,', 1),
 ('and', 1),
 ('an', 1),
 ('optimized', 1),
 ('engine', 1),
 ('that', 1),
 ('supports', 1),
 ('general', 1),
 ('execution', 1),
 ('graphs.', 1),
 ('It', 1),
 ('also', 1),
 ('supports', 1),
 ('a', 1),
 ('rich', 1),
 ('set', 1),
 ('of', 1),
 ('higher-level', 1),
 ('tools', 1),
 ('including', 1),
 ('Spark', 1),
 ('SQL', 1),
 ('for', 1),
 ('SQL', 1),
 ('and', 1),
 ('structured', 1),
 ('data', 1),
 ('processing,', 1),
 ('MLlib', 1),
 ('for', 1),
 ('machine', 1),
 ('learning,', 1),
 ('GraphX', 1),
 ('for', 1),
 ('graph', 1),
 ('processing,', 1),
 ('and', 1),
 ('Spark', 1),
 ('Streaming', 1)]

In [44]:
flatten_rdd.map(lambda x:(x,1)).reduceByKey(lambda x,y: x+y).collect()

[('is', 1),
 ('high-level', 1),
 ('an', 1),
 ('of', 1),
 ('GraphX', 1),
 ('graph', 1),
 ('and', 5),
 ('computing', 1),
 ('system.', 1),
 ('graphs.', 1),
 ('structured', 1),
 ('Streaming', 1),
 ('Spark', 3),
 ('in', 1),
 ('supports', 2),
 ('SQL', 2),
 ('fast', 1),
 ('cluster', 1),
 ('R,', 1),
 ('general', 1),
 ('also', 1),
 ('data', 1),
 ('provides', 1),
 ('Python', 1),
 ('engine', 1),
 ('set', 1),
 ('machine', 1),
 ('higher-level', 1),
 ('Apache', 1),
 ('It', 2),
 ('Java,', 1),
 ('Scala,', 1),
 ('tools', 1),
 ('that', 1),
 ('for', 3),
 ('general-purpose', 1),
 ('APIs', 1),
 ('optimized', 1),
 ('execution', 1),
 ('processing,', 2),
 ('MLlib', 1),
 ('learning,', 1),
 ('a', 2),
 ('rich', 1),
 ('including', 1)]

# New Dataset - Kaggle sofia Air Quality dataset

In [45]:
rdd_bme = sc.textFile('2017-07_bme280sof.csv')

In [46]:
type(rdd_bme)

pyspark.rdd.RDD

In [47]:
rdd_bme.count()

701549

In [48]:
rdd_bme.take(5)

['id,sensor_id,location,lat,lon,timestamp,pressure,temperature,humidity',
 '1,2266,1140,42.738,23.272,2017-07-01T00:00:07,95270.27,23.46,62.48',
 '5,2292,1154,42.663000000000004,23.273000000000003,2017-07-01T00:00:08,94355.83,23.06,59.46',
 '7,3096,1558,42.7,23.36,2017-07-01T00:00:10,95155.81,26.53,44.38',
 '9,3428,1727,42.623999999999995,23.406,2017-07-01T00:00:12,94679.57,28.34,38.28']

# Remove header

In [49]:
header = rdd_bme.first()

In [50]:
header

'id,sensor_id,location,lat,lon,timestamp,pressure,temperature,humidity'

In [51]:
rdd_bme = rdd_bme.filter(lambda line:line!=header)

In [52]:
rdd_bme.take(5)

['1,2266,1140,42.738,23.272,2017-07-01T00:00:07,95270.27,23.46,62.48',
 '5,2292,1154,42.663000000000004,23.273000000000003,2017-07-01T00:00:08,94355.83,23.06,59.46',
 '7,3096,1558,42.7,23.36,2017-07-01T00:00:10,95155.81,26.53,44.38',
 '9,3428,1727,42.623999999999995,23.406,2017-07-01T00:00:12,94679.57,28.34,38.28',
 '10,3472,1750,42.669,23.318,2017-07-01T00:00:13,94327.88,26.31,46.37']

In [53]:
rdd_bme.count()

701548

# Split the file by ','

In [54]:
rdd_bme_split = rdd_bme.map(lambda x: x.split(','))

In [55]:
rdd_bme_split.take(1)

[['1',
  '2266',
  '1140',
  '42.738',
  '23.272',
  '2017-07-01T00:00:07',
  '95270.27',
  '23.46',
  '62.48']]

# Count the number of sensors

In [56]:
rdd_bme_split.map(lambda x: x[1]).distinct().take(5)

['2266', '3096', '3428', '3472', '1846']

In [57]:
rdd_bme_split.map(lambda x: x[1]).distinct().count()

56

In [58]:
rdd_bme_split.map(lambda x:x[1]).distinct().collect()

['2266',
 '3096',
 '3428',
 '3472',
 '1846',
 '2228',
 '1954',
 '3620',
 '3436',
 '3092',
 '2036',
 '1962',
 '2607',
 '2224',
 '3102',
 '2216',
 '2264',
 '3558',
 '2262',
 '1764',
 '2038',
 '3500',
 '3982',
 '3296',
 '3642',
 '3832',
 '4467',
 '4475',
 '4625',
 '4661',
 '2292',
 '1952',
 '3512',
 '3438',
 '3474',
 '2232',
 '3738',
 '2040',
 '3432',
 '2294',
 '2230',
 '1850',
 '2234',
 '3836',
 '2323',
 '3326',
 '3328',
 '3968',
 '4358',
 '4469',
 '4471',
 '4473',
 '4477',
 '4479',
 '4558',
 '4608']

# - find the Number of records in Sensor 2266

In [59]:
rdd_bme_split.filter(lambda row : row[1]=='2266').count()

17708

# Paired rdd (rdd in key/value pair)

In [60]:
rdd_bme_paired = rdd_bme.map(lambda row:row.split(',')).map(lambda row:(row[1],row))

In [61]:
rdd_bme_paired.take(2)

[('2266',
  ['1',
   '2266',
   '1140',
   '42.738',
   '23.272',
   '2017-07-01T00:00:07',
   '95270.27',
   '23.46',
   '62.48']),
 ('2292',
  ['5',
   '2292',
   '1154',
   '42.663000000000004',
   '23.273000000000003',
   '2017-07-01T00:00:08',
   '94355.83',
   '23.06',
   '59.46'])]

In [62]:
List = [1,1,2,2,5,3,6,3,5,4,2,1,4,6]
dictionary = {}
for l in List:
    if l in dictionary.keys():
        dictionary[l]+=1
    else:
        dictionary[l]=1

In [63]:
dictionary

{1: 3, 2: 3, 5: 2, 3: 2, 6: 2, 4: 2}

In [64]:
rdd_bme_paired.countByKey()

defaultdict(int,
            {'2266': 17708,
             '2292': 11003,
             '3096': 17201,
             '3428': 15258,
             '3472': 17931,
             '1952': 15383,
             '1846': 18184,
             '3512': 17070,
             '2228': 17893,
             '3438': 14333,
             '1954': 18201,
             '3620': 15696,
             '3436': 13028,
             '3092': 18152,
             '2036': 17982,
             '1962': 18216,
             '3474': 17941,
             '2232': 16145,
             '2607': 18156,
             '2224': 18025,
             '3738': 18132,
             '3102': 17845,
             '2040': 9762,
             '2216': 11661,
             '3432': 18084,
             '2294': 18079,
             '2230': 17617,
             '2264': 18208,
             '1850': 17703,
             '2234': 18008,
             '3558': 6998,
             '2262': 18184,
             '1764': 18198,
             '3836': 18108,
             '2038': 18053,
     

# Union

In [65]:
rdd1 = sc.parallelize([('Phoebe',26),('Monica',24),('Rachel',24)])
rdd2 = sc.parallelize([('Ross',26),('Chandler',26),('Joey',25)])

In [66]:
rdd_union = rdd1.union(rdd2)

In [67]:
rdd_union.collect()

[('Phoebe', 26),
 ('Monica', 24),
 ('Rachel', 24),
 ('Ross', 26),
 ('Chandler', 26),
 ('Joey', 25)]

# Join

In [68]:
rdd_emp = sc.parallelize([(1,'Williams'),(2,'Mark')])
rdd_dept = sc.parallelize([(1,'HR'),(2,'IT')])

In [69]:
rdd_emp.join(rdd_dept).collect()

[(1, ('Williams', 'HR')), (2, ('Mark', 'IT'))]

# Cartesian

In [70]:
rdd_orders = sc.parallelize([('01'),('02'),('03')])
rdd_products = sc.parallelize([('Soap'),('Shampoo'),('Conditioner')])

In [71]:
rdd_orders.cartesian(rdd_products).collect()

[('01', 'Soap'),
 ('01', 'Shampoo'),
 ('01', 'Conditioner'),
 ('02', 'Soap'),
 ('02', 'Shampoo'),
 ('02', 'Conditioner'),
 ('03', 'Soap'),
 ('03', 'Shampoo'),
 ('03', 'Conditioner')]

# Persist

In [72]:
rdd_bme.persist(pyspark.StorageLevel.MEMORY_AND_DISK)

PythonRDD[62] at RDD at PythonRDD.scala:53

In [73]:
rdd_bme.count()

701548

In [74]:
rdd_bme.unpersist()

PythonRDD[62] at RDD at PythonRDD.scala:53

# reduceByKey
 -  find the average pressure per sensor per day
 - Remove the rows with columns less than 9
 - Remove the rows with pressure value not defined
 - Creating key-value pair with key as sensor_id and timestamp(Date part), and value as pressure and 1

In [75]:
rdd_bme_mapped = rdd_bme_split.filter(lambda row:len(row)==9)\
                              .filter(lambda row:row[6]!='')\
                              .map(lambda row:((row[1],row[5][0:10]),(float(row[6]),1)))

In [76]:
rdd_bme_mapped.take(5)

[(('2266', '2017-07-01'), (95270.27, 1)),
 (('2292', '2017-07-01'), (94355.83, 1)),
 (('3096', '2017-07-01'), (95155.81, 1)),
 (('3428', '2017-07-01'), (94679.57, 1)),
 (('3472', '2017-07-01'), (94327.88, 1))]

In [77]:
rdd_bme_mapped.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1])).take(5)

[(('2292', '2017-07-01'), (54339274.579999946, 577)),
 (('1952', '2017-07-01'), (51929139.60000006, 546)),
 (('3512', '2017-07-01'), (54215394.09000003, 572)),
 (('3438', '2017-07-01'), (53622333.59000005, 565)),
 (('3474', '2017-07-01'), (54400704.23999998, 576))]

In [78]:
rdd_bme_mapped.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))\
              . map(lambda x:(x[0],x[1][0]/x[1][1])).take(5)

[(('2292', '2017-07-01'), 94175.51920277288),
 (('1952', '2017-07-01'), 95108.3142857144),
 (('3512', '2017-07-01'), 94782.15750000006),
 (('3438', '2017-07-01'), 94906.78511504433),
 (('3474', '2017-07-01'), 94445.6670833333)]

# aggregateByKey
- find the maximum temperature for every date
- seqOp is defined which finds the max value in each partition for each key
- combOp is defined which finds the max value across partition for each key
- zeroVal is defined to store the initial value

In [79]:
rdd_bme_mapped_temp = rdd_bme_split.filter(lambda row:len(row)==9)\
                                   .filter(lambda row:row[7]!='')\
                                   .map(lambda row:((row[1],row[5][0:10]),float(row[7])))

In [80]:
rdd_bme_mapped_temp.take(2)

[(('2266', '2017-07-01'), 23.46), (('2292', '2017-07-01'), 23.06)]

In [81]:
zeroVal = 0

def seqOp(acc,element):
    if acc>element:
        return acc
    else:
        return element

def comOp(acc1,acc2):
    if acc1>acc2:
        return acc1
    else:
        return acc2


In [82]:
rdd_bme_mapped_temp.aggregateByKey(zeroVal,seqOp,comOp).take(5)

[(('2292', '2017-07-01'), 43.38),
 (('1952', '2017-07-01'), 45.6),
 (('3512', '2017-07-01'), 52.07),
 (('3438', '2017-07-01'), 48.96),
 (('3474', '2017-07-01'), 41.09)]

# combineByKey
- find the minimum humidity for every date
- create combiner which is similar to zeroVal in aggregateByKey. It initializes value for each key
- mergeValue does the local aggregations
- mergeCombiner does the aggregations for the keys across partitions

In [83]:
rdd_bme_mapped_humidity = rdd_bme_split.filter(lambda row:len(row)==9)\
                                   .filter(lambda row:row[8]!='')\
                                   .map(lambda row:((row[1],row[5][0:10]),float(row[8])))

In [84]:
def createCombiner(humidity):
    return humidity

def mergeValue(acc,element):
    if acc < element:
        return acc
    else:
        return element

def mergeCombiner(acc1,acc2):
    if acc1<acc2:
        return acc1
    else:
        return acc2
        

In [85]:
rdd_bme_mapped_humidity.combineByKey(createCombiner,mergeValue,mergeCombiner).take(5)

[(('2292', '2017-07-01'), 14.14),
 (('1952', '2017-07-01'), 15.93),
 (('3512', '2017-07-01'), 8.85),
 (('3438', '2017-07-01'), 8.76),
 (('3474', '2017-07-01'), 15.14)]

# Broadcast

In [86]:
b = sc.broadcast([1,2,3,4,5])
b.value

[1, 2, 3, 4, 5]

In [90]:
sc.parallelize([0,0]).flatMap(lambda x:b.value).collect()

[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]

In [91]:
b.unpersist()

# Repartition

In [92]:
rdd = sc.parallelize([1,2,3,4,5,6,7],4)
sorted(rdd.glom().collect())

[[1], [2, 3], [4, 5], [6, 7]]

In [93]:
rdd.repartition(2).glom().collect()

[[1, 4, 5, 6, 7], [2, 3]]

In [94]:
len(rdd.repartition(2).glom().collect())

2

In [95]:
# rdd_bme2 = rdd_bme.repartition(1000)

In [96]:
# rdd_bme2.glom().collect()