In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=938a08d9ece2ab8ff8ec8f3ac5836438f090253eccbd0794e0958f914698ecf7
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [4]:
import pandas as pd
# Create a sample CSV data
data={
    "name": ["John","Jane","Mike","Emily","Alex"],
    "age":[28,32,45,23,36],
    "gender":["Male","Female","Male","Female","Male"],
    "Salary":[60000,72000,84000,52000,67000]
}

df=pd.DataFrame(data)

# save the dataframe to a new csv file
csv_file_path="/content/sample_people.csv"
df.to_csv(csv_file_path, index=False)

# confirmed the csv file is created
print(f"CSV file created at: {csv_file_path}")

CSV file created at: /content/sample_people.csv


In [6]:
# 1. Extract
# Load the employee data from a CSV file containing the following columns: name, age, gender, and salary
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("EmployeeSalaryAnalysis").getOrCreate()
df=spark.read.csv("/content/sample_people.csv",header=True,inferSchema=True)

# 2. Transform
# filter employees aged 30 and above
df_filtered=df.filter(col('age')>30)
df_filtered.show()

# Add a new column (salary_with_bonus) with a 10% bonus on the salary
from pyspark.sql.functions import round
df_with_bonus=df_filtered.withColumn("Salary_with_bonus", round(col("Salary")*0.1,2))

# Group by gender and calculate average salary
avg_salary_by_gender=df_with_bonus.groupBy("gender").avg("Salary").withColumnRenamed("avg(Salary)","avg_salary")


# 3. Load
# Save the transformed data (including the bonus salary) in a Parquet file format for efficient storage and retrieval.
df_with_bonus.write.parquet("/content/convertedData.parquet")

df_with_bonus.show(truncate=False)

avg_salary_by_gender.show(truncate=False)

+----+---+------+------+
|name|age|gender|Salary|
+----+---+------+------+
|Jane| 32|Female| 72000|
|Mike| 45|  Male| 84000|
|Alex| 36|  Male| 67000|
+----+---+------+------+

+----+---+------+------+-----------------+
|name|age|gender|Salary|Salary_with_bonus|
+----+---+------+------+-----------------+
|Jane|32 |Female|72000 |7200.0           |
|Mike|45 |Male  |84000 |8400.0           |
|Alex|36 |Male  |67000 |6700.0           |
+----+---+------+------+-----------------+

+------+----------+
|gender|avg_salary|
+------+----------+
|Female|72000.0   |
|Male  |75500.0   |
+------+----------+



In [7]:
spark=SparkSession.builder.appName("SparkSQLExample").getOrCreate()

In [11]:
# Full refresh: Load the entire dataset
df_sales=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("/content/sales_data.csv")

# Apply transformed (if necessary)
df_transformed=df_sales.withColumn("total_sales",df_sales["quantity"]*df_sales["price"])

# Full refresh: Partition the data by 'date' and overwrite the existing data
output_path="/content/partitioned_data"
df_transformed.write.partitionBy("date").mode("overwrite").parquet(output_path)

# verify partitioned data
partitioned_df=spark.read.parquet(output_path)
partitioned_df.show()

+--------------+-----------+--------+--------+-----+-------------------+-----------+----------+
|transaction_id|customer_id| product|quantity|price|         updated_at|total_sales|      date|
+--------------+-----------+--------+--------+-----+-------------------+-----------+----------+
|             1|        101|  Laptop|       1| 1000|2024-09-01 08:00:00|       1000|2024-09-01|
|             2|        102|   Phone|       2|  500|2024-09-01 09:00:00|       1000|2024-09-01|
|             5|        105|Keyboard|       1|   50|2024-09-03 12:00:00|         50|2024-09-03|
|             6|        106|   Mouse|       3|   30|2024-09-03 13:00:00|         90|2024-09-03|
|             3|        103|  Tablet|       1|  300|2024-09-02 10:00:00|        300|2024-09-02|
|             4|        104| Monitor|       2|  200|2024-09-02 11:00:00|        400|2024-09-02|
+--------------+-----------+--------+--------+-----+-------------------+-----------+----------+



In [13]:
from pyspark.sql import functions as F
last_etl_run='2024-09-01 00:00:00'
# Full refresh: Load the entire dataset
df_sales=spark.read.format("csv")\
.option("header","true")\
.option("inferSchema","true")\
.load("/content/sales_data.csv")\
.filter(F.col("updated_at")>last_etl_run)

# Apply transformed (if necessary)
df_transformed=df_sales.withColumn("total_sales",df_sales["quantity"]*df_sales["price"])

# Full refresh: Partition the data by 'date' and overwrite the existing data
output_path="/content/partitioned_data"
df_transformed.write.partitionBy("date").mode("append").parquet(output_path)

# verify partitioned data
partitioned_df=spark.read.parquet(output_path)
partitioned_df.show()

+--------------+-----------+--------+--------+-----+-------------------+-----------+----------+
|transaction_id|customer_id| product|quantity|price|         updated_at|total_sales|      date|
+--------------+-----------+--------+--------+-----+-------------------+-----------+----------+
|             1|        101|  Laptop|       1| 1000|2024-09-01 08:00:00|       1000|2024-09-01|
|             2|        102|   Phone|       2|  500|2024-09-01 09:00:00|       1000|2024-09-01|
|             1|        101|  Laptop|       1| 1000|2024-09-01 08:00:00|       1000|2024-09-01|
|             2|        102|   Phone|       2|  500|2024-09-01 09:00:00|       1000|2024-09-01|
|             1|        101|  Laptop|       1| 1000|2024-09-01 08:00:00|       1000|2024-09-01|
|             2|        102|   Phone|       2|  500|2024-09-01 09:00:00|       1000|2024-09-01|
|             5|        105|Keyboard|       1|   50|2024-09-03 12:00:00|         50|2024-09-03|
|             6|        106|   Mouse|   

In [14]:
pip install ipywidgets

Collecting jedi>=0.16 (from ipython>=4.0.0->ipywidgets)
  Using cached jedi-0.19.1-py2.py3-none-any.whl.metadata (22 kB)
Using cached jedi-0.19.1-py2.py3-none-any.whl (1.6 MB)
Installing collected packages: jedi
Successfully installed jedi-0.19.1


In [20]:
from pyspark.sql import SparkSession
import ipywidgets as widgets
from IPython.display import display

# Step-1: Initialize a Spark session
spark=SparkSession.builder.appName("PySpark with widgets").getOrCreate()

# Step-2: Create a simple dataframe
data= [
    ("John",28,"Male",60000),
    ("Jane",32,"Female",72000),
    ("Mike",45,"Male",84000),
    ("Emily",23,"Female",52000),
    ("Alex",36,"Male",67000)
]

df=spark.createDataFrame(data,['name','age','gender','salary'])

df.show()

# Drop down widget to select column for filtering
column_dropdown=widgets.Dropdown(
    options=["age","salary"],
    value="salary",
    description="Filter by: "
)

# Slider widget to choose a value for filtering
slider=widgets.IntSlider(
    value=30,
    min=20,
    max=100,
    step=5,
    description="Threshold",
    continuous_update=False
)

# Button to trigger filtering
button=widgets.Button(description='Apply filter')

# output area to show the results
output=widgets.Output()

# Display the widgets
display(column_dropdown,slider,button, output)

def apply_filter(b):
  column=column_dropdown.value
  threshold=slider.value

  output.clear_output()

  df_filtered=df.filter(df[column] > threshold)

  with output:
    print(f"Filtering by {column} >  {threshold}")
    df_filtered.show()

button.on_click(apply_filter)

+-----+---+------+------+
| name|age|gender|salary|
+-----+---+------+------+
| John| 28|  Male| 60000|
| Jane| 32|Female| 72000|
| Mike| 45|  Male| 84000|
|Emily| 23|Female| 52000|
| Alex| 36|  Male| 67000|
+-----+---+------+------+



Dropdown(description='Filter by: ', index=1, options=('age', 'salary'), value='salary')

IntSlider(value=30, continuous_update=False, description='Threshold', min=20, step=5)

Button(description='Apply filter', style=ButtonStyle())

Output()