# Installing Pyspark

In [None]:
!pip install pyspark py4j

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# creating spark Session

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName('demo').getOrCreate()
spark

# convert from String (dd-mm-yyyy) date format to spark date format (yyyy-mm-dd)

In [None]:
# date format in Traditional databases is dd-mm-yyyy
# date format in spark is yyyy-mm-dd

from pyspark.sql.functions import *

df1 = df.withColumn('HIREDATE',to_date('HIREDATE','dd-mm-yyyy')).withColumn('UPDATED_DATE',to_date('UPDATED_DATE','dd-mm-yyyy'))

df1.show(5)
df1.printSchema()

+-----+------+--------+----+----------+----+----+------+------------+
|EMPNO| ENAME|     JOB| MGR|  HIREDATE| SAL|COMM|DEPTNO|UPDATED_DATE|
+-----+------+--------+----+----------+----+----+------+------------+
| 7369| SMITH|   CLERK|7902|1980-01-17| 800| 300|    10|  2022-01-01|
| 7499| ALLEN|SALESMAN|7698|1981-01-20|1600| 300|    20|  2022-01-01|
| 7521|  WARD|SALESMAN|7698|1981-01-22|1250| 500|    30|  2022-01-01|
| 7566| JONES| MANAGER|7839|1981-01-04|2975|null|    40|  2022-01-05|
| 7654|MARTIN|SALESMAN|7698|1981-01-21|1250|1400|    10|  2022-01-03|
+-----+------+--------+----+----------+----+----+------+------------+
only showing top 5 rows

root
 |-- EMPNO: integer (nullable = true)
 |-- ENAME: string (nullable = true)
 |-- JOB: string (nullable = true)
 |-- MGR: integer (nullable = true)
 |-- HIREDATE: date (nullable = true)
 |-- SAL: integer (nullable = true)
 |-- COMM: integer (nullable = true)
 |-- DEPTNO: integer (nullable = true)
 |-- UPDATED_DATE: date (nullable = true)



#Adding new columns with different values

In [None]:
df3 = df2.withColumn('Levels',when(col('JOB')=='SALESMAN','Level3').when(col('JOB')=='CLERK','Level2').when(col('JOB')=='MANAGER','Level1')
df3.show()

SyntaxError: ignored

# Creating Data Frame from mysql table

In [None]:
"""
df_mysql = spark.read.format('jdbc').\
           option('url','jdbc:mysql://localhost:3306').\
           option('driver','com.mysql.jdbc.Driver').\
           option('user','root').\
           option('password','sandeep').\
           option('query','select * from sandeep.emp_table').\
           load()
"""


# Json file Handling


complex Data types

1.struct - dict

2.array -  list - To flattern complex datatype(array datatype ) we can you explode() function

3.map




In [None]:
# Creating DataFrame from Json file

data = spark.read.format('json').load('/content/emp.json')
data.show()
data.printSchema()
data.count()


# Creating DataFrame from multiLine Json file

In [None]:
# if json file has  more no.of lines (nested data) .then we should use multiLine in options

Mul_data = spark.read.format('json').option('multiline',True).option('inferSchema',True).option('nullValue','null').load('/content/nested_json.json')

Mul_data.show(truncate = False)

Mul_data.printSchema()
Mul_data.count()

In [None]:
Mul_data1 = Mul_data.withColumn('batters_exp',explode('batters.batter')) \
         .withColumn('batters_id',col('batters_exp.id')) \
         .withColumn('batters_type',col('batters_exp.type')) \
         .drop('batters','batters_exp') \
         .withColumn('topping_exp',explode('topping')) \
         .withColumn('topping_id',col('topping_exp.id')) \
         .withColumn('topping_type',col('topping_exp.type')) \
         .drop('topping','topping_exp') \


Mul_data1.show(10)

Mul_data1.printSchema()

Mul_data1.count()

# Functions in pyspark

In [None]:
from pyspark.sql.functions import *

fun = spark.sql('show functions')
print(fun.count())
print(fun.show())

In [None]:
print(type(fun))

# describe function details
spark.sql('describe function aggregate').show(truncate=False)

# Creating dataframe from Hive table



In [None]:

#DSL - Domain Specific language

hive = spark.read.table('oracle_db.emp_data')
hive.show(10)
hive.printSchema()
hive.count()


# Adding current timestamp to dataframe

In [None]:
hive1 = hive.withColumn('Date',current_timestamp())
hive1.show(truncate = False)

# Save Dataframe to Hive

In [None]:
hive1.write.partitionBy('HIREDATE').saveAsTable('Emp_Hive')


In [None]:
spark.sql('select * from emp_hive').show()

# word count program step by step

In [None]:
rd = spark.sparkContext.textFile("/content/word.txt")

print(type(rd))
print(rd.collect())       # no.of lines
print(rd.count())

In [None]:
rd1 = rd.map(lambda x : x.encode('utf-8'))

print(rd1.collect())

In [None]:
# map - gives Number of lists of strings

rd2 = rd.map(lambda x:x.split(' '))
print(type(rd2))
print(rd2.collect())

In [None]:
#flatmap - gives single list of strings

rd3 = rd.flatMap(lambda x : x.split(' '))
print(rd3.collect())

In [None]:
# assiging value to strings
rd4 =  rd3.map(lambda x : (x,1))
print(rd4.collect())

In [None]:
# reduceByKey -  groupping and aggregating same keys from rd4

rd5 = rd4.reduceByKey(lambda x ,y : x+y)

print(rd5.collect())

# word count in single line - Find No.of Occurences of single word

In [None]:
RDD = spark.sparkContext.textFile('/content/word.txt').flatMap(lambda x : x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y : x+y)

RDD.collect()

[('The', 1),
 ('forest', 1),
 ('raven', 3),
 ('also', 2),
 ('commonly', 1),
 ('known', 1),
 ('as', 3),
 ('the', 4),
 ('Tasmanian', 2),
 ('is', 1),
 ('a', 2),
 ('passerine', 1),
 ('bird', 1),
 ('in', 3),
 ('family', 1),
 ('Corvidae', 1),
 ('native', 1),
 ('to', 2),
 ('Tasmania', 1),
 ('and', 5),
 ('parts', 2),
 ('of', 3),
 ('southern', 1),
 ('Victoria', 1),
 ('such', 1),
 ('Wilsons', 1),
 ('Promontory', 1),
 ('Portland', 1),
 ('Populations', 1),
 ('are', 2),
 ('found', 1),
 ('New', 2),
 ('South', 2),
 ('Wales', 2),
 ('including', 1),
 ('Dorrigo', 1),
 ('Armidale', 1),
 ('it', 1),
 ('has', 1),
 ('allblack', 1),
 ('plumage', 1),
 ('beak', 1),
 ('legs', 1),
 ('As', 1),
 ('with', 2),
 ('other', 1),
 ('two', 1),
 ('species', 1),
 ('Australia,', 1),
 ('its', 1),
 ('black', 1),
 ('feathers', 1),
 ('have', 3),
 ('grey', 1),
 ('bases', 1),
 ('Adults', 1),
 ('white', 1),
 ('irises;', 1),
 ('younger', 1),
 ('birds', 1),
 ('dark', 1),
 ('brown', 1),
 ('then', 1),
 ('hazel', 1),
 ('irises', 1),
 ('a

# fill missing data in textFile and convert into Dataframe

In [None]:
df_miss = spark.read.format('csv').option('sep',' ').load('/content/fill missing.txt').fillna('no data')

df_miss.show()



+-----+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|  _c0|    _c1|    _c2|    _c3|    _c4|    _c5|    _c6|    _c7|    _c8|    _c9|   _c10|   _c11|
+-----+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|no data|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|no data|no data|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|no data|no data|no data|no data|
|Spark|  Spark|  Spark|  Spark|  Spark|no data|no data|no data|no data|no data|no data|no data|
|Spark|  Spark|  Spark|  Spark|no data|n

#udf register

In [None]:
from pyspark.sql.types import *

# UDF

def f1(x):
  return ((x*x)-x)

spark.udf.register('fun',f1,IntegerType())
fun = udf(f1,IntegerType())

print(f1(5))

In [None]:
df5.select('ENAME',sqrt('SAL'),fun('SAL')).show()

# how to handle yy date format in pyspark for before 2000 data

In [None]:
yy_df = spark.read.format('csv').option('header',True).option('sep','|').option('inferSchema',True).option('nullValue','null').load('/content/emp_pipe_yy.txt')

yy_df.show()
yy_df.printSchema()

In [None]:

#Spark defaulty chooses year after 2000

# we need to set spark sql legcy timeparserpolicy to legacy -- Useful for less columns data only ,if more data means need ask source system set proper date format

spark.conf.set('spark.sql.legacy.timeParserPolicy','LEGACY')

yy_df.withColumn('DATE',to_date('UPDATED_DATE','dd-mm-yy')).show()

# how to handle duplicate column error

In [None]:
dept = spark.read.format('csv').option('header',True).option('inferSchema',True).option('nullValue','null').load('/content/dept.csv')
emp = spark.read.format('csv').option('header',True).option('inferSchema',True).option('nullValue','null').load('/content/emp.csv')

dept.show()
emp.show()

In [None]:
# innerjoin

emp_dept = emp.join(dept,emp['DEPTNO'] == dept['depno'],'inner').drop('depno')

emp_dept.show()




In [None]:

# write into a delta table

emp_dept.write.saveAsTable('emp_dept_table')

In [None]:
spark.sql('select count(*) from emp_dept_table').show()

# how to handle bad data



In [None]:
bad = spark.read.format('csv').option('header',True).option('inferSchema',True).option('nullValue','null').load('/content/channels.csv')

bad.show()

bad.schema

Spark read Mode:

1.PERMISSIVE - allows bad data - it's spark default mode

2.FAILFAST - won't allows bad data -it raise expection - it won't process further

3.DROPMALFORMED - drops bad records based on schema -it won't save bad records

4.badrecordsPath - save good data in table and saves bad it another path

In [None]:
from pyspark.sql.types import *

# need to add _corrupt_record column string type in custom made schema
schema  = StructType([StructField('CHANNEL_ID', IntegerType(), True),
                      StructField('CHANNEL_DESC', StringType(), True),
                      StructField('CHANNEL_CLASS', StringType(), True),
                      StructField('CHANNEL_CLASS_ID', IntegerType(), True),
                      StructField('CHANNEL_TOTAL', StringType(), True),
                      StructField('CHANNEL_TOTAL_ID', IntegerType(), True),
                      StructField("BadData", StringType(), True)])

In [None]:
#save bad Records Using mode - PERMISSIVE and _corrupt_record ,columnNameofCorrputRecord

bad1 = spark.read.format('csv').schema(schema).option('Mode','PERMISSIVE').option('ColumnNameOfCorruptRecord','BadData').option('header',True).option('nullValue','null').load('/content/channels.csv')
bad1.show()

# filter good records
goodData = bad1.filter('BadData is Null').drop('BAdData')
goodData.show()

# filter corrupt records
bad3 = bad1.filter('BadData is Not Null')
bad3.show()

In [None]:
#mode - FAILFAST

bad = spark.read.format('csv').schema(schema).option('mode','FAILFAST').option('header',True).option('nullValue','null').load('/content/channels.csv')
bad.show()


In [None]:
#DROPMALFORMED

bad = spark.read.format('csv').schema(schema).option('mode','DROPMALFORMED').option('header',True).option('nullValue','null').load('/content/channels.csv')
bad.show()

# Difference Between sort and order by



1. Spark_sql : orderBy will do sorting an entire data ,sortby will do Partition wise sorting in sparksql .


2. pyspark : orderBy and sort are same pyspark.sortwithinpartitions same as sortby ( it will do Partition wise sorting)



In [None]:
ta_df = spark.read.load('/content/spark-warehouse/oracle_db.db/emp_dept_table').repartition(4,'DEPTNO').withColumn('partition',spark_partition_id())

ta_df.show()

In [None]:
#In Pyspark -orderBy and sort are same pyspark

#orderBy

ta_df.orderBy('SAL').show()


In [None]:
#sort
ta_df.sort('SAL').show()

In [None]:
#sortWithinPartitions -sortwithinpartitions same as sortby ( it will do Partition wise sorting)

ta_df.sortWithinPartitions('SAL').show()

In [None]:
ta_df.createOrReplaceTempView('ta_df')

In [None]:
#order by - sort entire data

spark.sql('select * from ta_df order by SAL').show()

In [None]:
#sort by - will do partition wise sorting
spark.sql('select * from ta_df sort by SAL').show()

# coalesce and repartition in rdd

coalesce : is a  Narrow transformation : adjust data in existing partition,No shuffling ,By defult it will used for decrease the partitions.
for increasing partitions we need provide another argument True ,then it will shuffle the data.

repartition : is a wide transformation : create new partitions,Data shuffle will happen,used for increase/decrease the partitions


In [None]:

from pyspark import SparkContext

sc = SparkContext.getOrCreate()

In [None]:
rdd = sc.parallelize(range(10),5)

rdd1 = rdd.coalesce(2) # used to decrease no.of partitions ,No shuffle will happen

rdd2 = rdd.coalesce(4,True)  # use True to increase no.of partitions ,shuffle will happen

rdd3 = rdd.repartition(2)    # use True to decrease no.of partitions ,shuffle will happen

rdd4 = rdd.repartition(6)   # use True to increase no.of partitions ,shuffle will happen

rdd.coalesce()

rdd.repartition()

print('original rdd', rdd.glom().collect())
print('coalesce 2 ',rdd1.glom().collect())
print('coalesce 4',rdd2.glom().collect())
print('repartition 2',rdd3.glom().collect())
print('repartition 6',rdd4.glom().collect())

original rdd [[0, 1], [2, 3], [4, 5], [6, 7], [8, 9]]
coalesce 2  [[0, 1, 2, 3], [4, 5, 6, 7, 8, 9]]
coalesce 4 [[4, 5], [0, 1, 2, 3], [], [6, 7, 8, 9]]
repartition 2 [[4, 5, 6, 7, 8, 9], [0, 1, 2, 3]]
repartition 6 [[6, 7], [], [8, 9], [0, 1], [2, 3], [4, 5]]


# coalesce and repartition in dataframe

coalesce : is a  Narrow transformation : adjust data in existing partition,No shuffling ,By defult it will used for decrease the partitions.


repartition : is a wide transformation : create new partitions,Data shuffle will happen,used for increase/decrease the partitions,
we can repartition based on column specific to increse the performence

In [None]:
cr_df = spark.read.load('/content/spark-warehouse/oracle_db.db/emp_dept_table')
print(cr_df.rdd.getNumPartitions())
cr_df.show()

In [None]:
cr_df1 = cr_df.repartition(4).withColumn('partition_id',spark_partition_id())
cr_df1.show()

In [None]:

#repartition based on joining columns/filtering column to imporve performance
cr_df2 = cr_df.repartition(4,'DEPTNO').withColumn('partition_id',spark_partition_id())
cr_df2.show()

In [None]:
cr_df3 = cr_df.coalesce(3).withColumn('partition_id',spark_partition_id())
cr_df3.show()

In [None]:
df.coalesce()
df.repartition()

# Creating Data Frame from REST API

In [None]:
#REST API -Accessing the data over internet through Urls

import requests
import json


api = requests.request('GET','https://api.github.com/users/hadley/orgs')

data = api.json()

file = open('/content/sample_data/apidata.json','a')

for record in data:
  file.write("%s\n" %record)

api_df = spark.read.format('json').load('/content/sample_data/apidata.json')


In [None]:
print(type(api.json()))
print(len(api.json()))

<class 'list'>
10


In [None]:
api_df.show(10)
api_df.printSchema()
api_df.count()

++
||
++
++

root



0