## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
# File location and type
file_location = "/FileStore/tables/Employee_info_1.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [0]:
# Create a view or table

temp_table_name = "Employee_info_1_csv"

df.createOrReplaceTempView(temp_table_name)

In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `Employee_info_1_csv`

In [0]:
# With this registered as a temp view, it will only be available to this particular notebook. If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
# Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
# To do so, choose your table name and uncomment the bottom line.

permanent_table_name = "Employee_info_1_csv"

# df.write.format("parquet").saveAsTable(permanent_table_name)

In [0]:
"""
Assignment 1 Spark ~ Aman Sir

Create 2 data frame and and join them.

Create a dataframe and replace the null values of a column with some meaningful thing using withcolumn.

Note: Please use below csv to create data frame.

CSV FILES - Employee_info.csv & Employee_info_1.csv
"""

In [0]:
#Importing pyspark Lib.
import pyspark

#Making spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Assignment').getOrCreate()

#About Spark
spark

In [0]:
#Reading single File
df1 = spark.read.format("csv").option("inferSchema", True).option("header", True).option("sep",",").load("/FileStore/tables/Employee_info.csv")

display (df1)

print(df1.count())

Id,Name,Department
100,Aman,Data Engineering
200,Saurabh,Data Engineering
300,Mohit,DavOps
400,Kashif,DAvOps
500,Eniya,DAvOps
600,Anand,DAvOps
700,Murali,Data Engineering
800,Ramesh,Null
900,Suresh,Null
1000,Himanshu,Null


10


In [0]:
df1.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Department: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema_defined1 = StructType([StructField('Id', IntegerType(), True),
                             StructField('Name', StringType(),True),
                             StructField('Department', StringType(),True),
                            ])

In [0]:
df_a= spark.read.format("csv").schema(schema_defined1).option("header", True ).option("sep",",").load("/FileStore/tables/Employee_info.csv")
display (df_a)

Id,Name,Department
100,Aman,Data Engineering
200,Saurabh,Data Engineering
300,Mohit,DavOps
400,Kashif,DAvOps
500,Eniya,DAvOps
600,Anand,DAvOps
700,Murali,Data Engineering
800,Ramesh,Null
900,Suresh,Null
1000,Himanshu,Null


In [0]:
#Reading single File
df2 = spark.read.format("csv").option("inferSchema", True).option("header", True).option("sep",",").load("/FileStore/tables/Employee_info_1.csv")

display (df2)

print(df2.count())

Id,Employee_id,City,State
100,2555,Indore,MP
200,2456,Indore,MP
300,3265,Surat,GJ
400,7896,Banglore,KA
500,4562,Banglore,KA
600,8524,Banglore,KA
700,6666,Banglore,KA
800,9853,Banglore,KA
900,1594,Null,JK
1000,7894,Null,JK


10


In [0]:
df2.printSchema()

root
 |-- Id: integer (nullable = true)
 |-- Employee_id: integer (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)



In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema_defined2 = StructType([StructField('Id',IntegerType(), True),
                             StructField('Employee_id',IntegerType(),True),
                             StructField('City',StringType(),True),
                             StructField('State',StringType(),True),
                            ])

In [0]:
df_b= spark.read.format("csv").schema(schema_defined2).option("header", True ).option("sep",",").load("/FileStore/tables/Employee_info_1.csv")
display (df_a)

Id,Name,Department
100,Aman,Data Engineering
200,Saurabh,Data Engineering
300,Mohit,DavOps
400,Kashif,DAvOps
500,Eniya,DAvOps
600,Anand,DAvOps
700,Murali,Data Engineering
800,Ramesh,Null
900,Suresh,Null
1000,Himanshu,Null


In [0]:
"""
Now we have two dataframes with defined schema df_a & df_b

"""

In [0]:
#join 'Full outer join'
df_join = df_a.join(df_b,df_a.Id == df_b.Id,"outer")
display(df_join)

Id,Name,Department,Id.1,Employee_id,City,State
100,Aman,Data Engineering,100,2555,Indore,MP
200,Saurabh,Data Engineering,200,2456,Indore,MP
300,Mohit,DavOps,300,3265,Surat,GJ
400,Kashif,DAvOps,400,7896,Banglore,KA
500,Eniya,DAvOps,500,4562,Banglore,KA
600,Anand,DAvOps,600,8524,Banglore,KA
700,Murali,Data Engineering,700,6666,Banglore,KA
800,Ramesh,Null,800,9853,Banglore,KA
900,Suresh,Null,900,1594,Null,JK
1000,Himanshu,Null,1000,7894,Null,JK


In [0]:
#Replacing 'NUll' with meaningfull
from pyspark.sql.functions import regexp_replace
df_join.withColumn('City', regexp_replace('City', 'Null', 'Srinagar')) \
  .show(truncate=False)

+----+--------+----------------+----+-----------+--------+-----+
|Id  |Name    |Department      |Id  |Employee_id|City    |State|
+----+--------+----------------+----+-----------+--------+-----+
|100 |Aman    |Data Engineering|100 |2555       |Indore  |MP   |
|200 |Saurabh |Data Engineering|200 |2456       |Indore  |MP   |
|300 |Mohit   |DavOps          |300 |3265       |Surat   |GJ   |
|400 |Kashif  |DAvOps          |400 |7896       |Banglore|KA   |
|500 |Eniya   |DAvOps          |500 |4562       |Banglore|KA   |
|600 |Anand   |DAvOps          |600 |8524       |Banglore|KA   |
|700 |Murali  |Data Engineering|700 |6666       |Banglore|KA   |
|800 |Ramesh  |Null            |800 |9853       |Banglore|KA   |
|900 |Suresh  |Null            |900 |1594       |Srinagar|JK   |
|1000|Himanshu|Null            |1000|7894       |Srinagar|JK   |
+----+--------+----------------+----+-----------+--------+-----+



In [0]:
from pyspark.sql.functions import regexp_replace
df_join.withColumn('Department', regexp_replace('Department', 'Null', 'Other')) \
  .show(truncate=False)

+----+--------+----------------+----+-----------+--------+-----+
|Id  |Name    |Department      |Id  |Employee_id|City    |State|
+----+--------+----------------+----+-----------+--------+-----+
|100 |Aman    |Data Engineering|100 |2555       |Indore  |MP   |
|200 |Saurabh |Data Engineering|200 |2456       |Indore  |MP   |
|300 |Mohit   |DavOps          |300 |3265       |Surat   |GJ   |
|400 |Kashif  |DAvOps          |400 |7896       |Banglore|KA   |
|500 |Eniya   |DAvOps          |500 |4562       |Banglore|KA   |
|600 |Anand   |DAvOps          |600 |8524       |Banglore|KA   |
|700 |Murali  |Data Engineering|700 |6666       |Banglore|KA   |
|800 |Ramesh  |Other           |800 |9853       |Banglore|KA   |
|900 |Suresh  |Other           |900 |1594       |Null    |JK   |
|1000|Himanshu|Other           |1000|7894       |Null    |JK   |
+----+--------+----------------+----+-----------+--------+-----+

