# Quesitons

In [34]:
# Coding Assesment: 
# Implement Processing JSON and CSV data with PySpark
# Explain ETL (Extract, Transform, Load) with PySpark
# Using Spark SQL - Creating databases, tables 
# Using Spark SQL - Transformations such as Filter, Join, Simple Aggregations, GroupBy.

#### Answer for question1 :  Implement Processing JSON and CSV data with PySpark

In [35]:
# Processing JSON and CSV data with PySpark

# reading csv file
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("pyspark_coding").getOrCreate()


In [36]:
#showing entire csv data
read_csv_data = spark.read.csv("Marks_data.csv",header = True, inferSchema = True )
read_csv_data.show()

+----+--------+--------+---+
|Name|M1 Score|M2 Score|age|
+----+--------+--------+---+
|Alex|      62|      80| 20|
|Brad|      45|      56| 19|
|Joey|      85|      98| 21|
|NULL|      54|      79| 20|
|abhi|    NULL|    NULL| 20|
+----+--------+--------+---+



In [37]:
read_csv_data.columns

['Name', 'M1 Score', 'M2 Score', 'age']

In [38]:
# reading json file data
read_json_data = spark.read.json("sample_json.json")
read_json_data.printSchema()


root
 |-- cgpa: double (nullable = true)
 |-- name: string (nullable = true)
 |-- phonenumber: string (nullable = true)
 |-- rollno: long (nullable = true)



In [39]:
read_json_data.show()

+----+-------+-----------+------+
|cgpa|   name|phonenumber|rollno|
+----+-------+-----------+------+
|8.18|Abhiram| 9976770500|   580|
+----+-------+-----------+------+



### Answer for question2 :  Explain ETL (Extract, Transform, Load) with PySpark

In [40]:
# ETl process on a sample data set

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit,floor,rand
spark = SparkSession.builder.appName("pyspark_coding").getOrCreate()

read_etl = spark.read.csv("salary.csv",header = True, inferSchema = True)
read_etl.show()

+------+---+---+----------+------+
|  name| id|age|department|salary|
+------+---+---+----------+------+
| user1|  1| 25|Jr manager| 98000|
| user2|  2| 30|sr manager|100000|
| user3|  6| 35|sr manager|100000|
| user4|  4| 32|      head| 70000|
| user5|  1| 45|Jr manager| 60000|
| user6|  6| 47|     head2| 45000|
| user7|  5| 21|    worker| 25000|
| user8|  1| 22|Jr manager| 50000|
| user9| 10| 54|      lead| 45000|
|user10| 59| 52|     lead2| 50000|
|user11|  6| 25|     head2| 50000|
|user12|  2| 27|sr manager| 70000|
|user13| 59| 54|     lead2| 45000|
|user14|  2| 25|sr manager| 70000|
|user15|  1| 32|Jr manager| 50000|
|user16|  3| 37|    worker| 25000|
|user17| 74| 63|   Manager| 68000|
|user18|  7| 25|      head| 45000|
|user19| 10| 32| lvl2 head| 52000|
|user20| 10| 32| lvl2 head| 52000|
+------+---+---+----------+------+



In [46]:
 #creating new column
read_etl = read_etl.withColumn('Avg_sal',floor(lit(25000) + rand() * lit(10000)))
read_etl.show(10)



+------+---+---+----------+------+---------+-------+
|  name| id|age|department|salary|Total_sal|Avg_sal|
+------+---+---+----------+------+---------+-------+
| user1|  1| 25|Jr manager| 98000|    50032|  29724|
| user2|  2| 30|sr manager|100000|    50015|  25605|
| user3|  6| 35|sr manager|100000|    50037|  25137|
| user4|  4| 32|      head| 70000|    50012|  29389|
| user5|  1| 45|Jr manager| 60000|    50035|  32277|
| user6|  6| 47|     head2| 45000|    50030|  29663|
| user7|  5| 21|    worker| 25000|    50047|  26395|
| user8|  1| 22|Jr manager| 50000|    50033|  31826|
| user9| 10| 54|      lead| 45000|    50038|  32459|
|user10| 59| 52|     lead2| 50000|    50038|  34986|
+------+---+---+----------+------+---------+-------+
only showing top 10 rows



In [48]:
read_etl = read_etl.filter(col('age')>= 50)
read_etl.show()

+------+---+---+----------+------+---------+-------+
|  name| id|age|department|salary|Total_sal|Avg_sal|
+------+---+---+----------+------+---------+-------+
| user9| 10| 54|      lead| 45000|    50038|  32459|
|user10| 59| 52|     lead2| 50000|    50038|  34986|
|user13| 59| 54|     lead2| 45000|    50019|  25204|
|user17| 74| 63|   Manager| 68000|    50047|  30134|
+------+---+---+----------+------+---------+-------+



In [49]:
read_etl = read_etl.orderBy("age")
read_etl.show()

+------+---+---+----------+------+---------+-------+
|  name| id|age|department|salary|Total_sal|Avg_sal|
+------+---+---+----------+------+---------+-------+
|user10| 59| 52|     lead2| 50000|    50038|  34986|
| user9| 10| 54|      lead| 45000|    50038|  32459|
|user13| 59| 54|     lead2| 45000|    50019|  25204|
|user17| 74| 63|   Manager| 68000|    50047|  30134|
+------+---+---+----------+------+---------+-------+



In [None]:
#laoding the dataset into another file
target_path = "c:desktop\salary_result.csv"
read_etl.write.csv(target_path, mode='overwrite', header=True)

### Answer to question3 : Using Spark SQL - Creating databases, tables  

In [59]:
#creating databases
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("pyspark_coding").getOrCreate()

# Create a new database
database_name = "coding_database"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# using created database
spark.sql(f"USE {database_name}")

# Create a DataFrame using spark.sql
data = [("Abhi", "python"), ("ram", "c++"), ("cherry", "java")]
columns = ["Name", "lang"]
df = spark.createDataFrame(data, columns)

# Register the DataFrame as a temporary table
df.createOrReplaceTempView("Coding_table")

# Run a SQL query on the temporary table
result = spark.sql("SELECT * FROM Coding_table" )
result.show()


+------+------+
|  Name|  lang|
+------+------+
|  Abhi|python|
|   ram|   c++|
|cherry|  java|
+------+------+



### Answer to question 4 :Using Spark SQL - Transformations such as Filter, Join, Simple Aggregations, GroupBy. 

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,concat,lit,floor,rand
spark = SparkSession.builder.appName("pyspark_coding").getOrCreate()
agg_data = spark.read.csv("salary.csv",header = True, inferSchema = True)
agg_data.show(5)

In [67]:
#performing filter 

filter_col = agg_data.filter(agg_data["department"] == "sr manager")
filter_col.show()

+------+---+---+----------+------+
|  name| id|age|department|salary|
+------+---+---+----------+------+
| user2|  2| 30|sr manager|100000|
| user3|  6| 35|sr manager|100000|
|user12|  2| 27|sr manager| 70000|
|user14|  2| 25|sr manager| 70000|
+------+---+---+----------+------+



In [94]:
#implementing joins
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("pyspark_coding").getOrCreate()

# Create two DataFrames
data1 = [("Abhi", "python"), ("ram", "c++"), ("cherry", "java"),("arun", "java")]
columns = ["Name", "lang"]
df1 = spark.createDataFrame(data1, columns)

data2 = [("Abhi", "Engineer"), ("arun", "Analyst"), ("kiran", "Manager")]
columns2 = ["Name", "Role"]
df2 = spark.createDataFrame(data2, columns2)

# Perform an inner join on the 'Name' column
join_col = df1.join(df2, "Name", "inner")

# Show the result
join_col.show()



+----+------+--------+
|Name|  lang|    Role|
+----+------+--------+
|Abhi|python|Engineer|
|arun|  java| Analyst|
+----+------+--------+



In [77]:
# aggregate function: max()
filter_col2 = agg_data.groupBy("department").max("salary")
filter_col2.show()

+----------+-----------+
|department|max(salary)|
+----------+-----------+
|Jr manager|      98000|
|      head|      70000|
|sr manager|     100000|
|     head2|      50000|
| lvl2 head|      52000|
|      lead|      45000|
|   Manager|      68000|
|    worker|      25000|
|     lead2|      50000|
+----------+-----------+



In [78]:
# aggregate function: min()
filter_col3 = agg_data.groupBy("department").min("salary")
filter_col3.show()

+----------+-----------+
|department|min(salary)|
+----------+-----------+
|Jr manager|      50000|
|      head|      45000|
|sr manager|      70000|
|     head2|      45000|
| lvl2 head|      52000|
|      lead|      45000|
|   Manager|      68000|
|    worker|      25000|
|     lead2|      45000|
+----------+-----------+



In [82]:
# aggregate function: avg()
filter_col3 = agg_data.groupBy("department").avg("salary")
filter_col3.show()

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|Jr manager|    64500.0|
|      head|    57500.0|
|sr manager|    85000.0|
|     head2|    47500.0|
| lvl2 head|    52000.0|
|      lead|    45000.0|
|   Manager|    68000.0|
|    worker|    25000.0|
|     lead2|    47500.0|
+----------+-----------+



In [83]:

# aggregate function: sum()
filter_col3 = agg_data.groupBy("department").sum("salary")
filter_col3.show()

+----------+-----------+
|department|sum(salary)|
+----------+-----------+
|Jr manager|     258000|
|      head|     115000|
|sr manager|     340000|
|     head2|      95000|
| lvl2 head|     104000|
|      lead|      45000|
|   Manager|      68000|
|    worker|      50000|
|     lead2|      95000|
+----------+-----------+



In [84]:
# aggregate function: sum()
filter_col3 = agg_data.groupBy("department").mean("salary")
filter_col3.show()

+----------+-----------+
|department|avg(salary)|
+----------+-----------+
|Jr manager|    64500.0|
|      head|    57500.0|
|sr manager|    85000.0|
|     head2|    47500.0|
| lvl2 head|    52000.0|
|      lead|    45000.0|
|   Manager|    68000.0|
|    worker|    25000.0|
|     lead2|    47500.0|
+----------+-----------+



In [None]:
# agg_data.show()