In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType
from pyspark.sql import Window as W
import pyspark.sql.functions as F

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("30 Days SQL") \
    .getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/10 17:04:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Day 1 (https://www.youtube.com/watch?v=FRzbOb3jdLg) "HARD"
# Define schema based on the table structure
schema = StructType([
    StructField("brand1", StringType(), True),
    StructField("brand2", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("custom1", IntegerType(), True),
    StructField("custom2", IntegerType(), True),
    StructField("custom3", IntegerType(), True),
    StructField("custom4", IntegerType(), True)
])

# Data from the image's table
data = [
    ("apple", "samsung", 2020, 1, 2, 1, 2),
    ("samsung", "apple", 2020, 1, 2, 1, 2),
    ("apple", "samsung", 2021, 1, 2, 5, 3),
    ("samsung", "apple", 2021, 5, 3, 1, 2),
    ("google", "", 2020, 5, 9, None, None),
    ("oneplus", "nothing", 2020, 5, 9, 6, 3),
]

# Create DataFrame using the data and schema defined above
df = spark.createDataFrame(data, schema=schema)
df.createOrReplaceTempView("Sales")
df.show()


In [None]:
spark.sql("""
with cte as (
    select 
    least(brand1,brand2) as brand1, 
    greatest(brand1,brand2) as brand2,
    year,custom1,custom2,custom3,custom4
    from sales)
,
cte2 as (
    select *, 
    case 
    when (custom1 = custom3) and (custom2 = custom4)  then 1 
    when (custom1 != custom3) or (custom2 != custom4) then 0 
    end as fl 
    from cte)
,
cte3 as (
    select *, 
    if(fl = 1, row_number() over(partition by  brand1, brand2, year, fl order by brand1, brand2, year, fl), 0) as rnk
    from cte2)
select * from cte3 where rnk <=1
""").show()

In [None]:
# Day2 https://www.youtube.com/watch?v=FRzbOb3jdLg

mountain_huts_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("altitude", IntegerType(), True)
])

# Data for mountain_huts DataFrame
mountain_huts_data = [
    (1, 'Dakonat', 1900),
    (2, 'Natisa', 2100),
    (3, 'Gajantut', 1600),
    (4, 'Rifat', 782),
    (5, 'Tupur', 1370)
]

# Create DataFrame for mountain_huts
df_mountain_huts = spark.createDataFrame(data=mountain_huts_data, schema=mountain_huts_schema)
df_mountain_huts.show()
df_mountain_huts.createOrReplaceTempView("mountain_huts")

# Define schema for trails DataFrame
trails_schema = StructType([
    StructField("hut1", IntegerType(), True),
    StructField("hut2", IntegerType(), True)
])

# Data for trails DataFrame
trails_data = [
    (1, 3),
    (3, 2),
    (3, 5),
    (4, 5),
    (1, 5)
]

# Create DataFrame for trails
df_trails = spark.createDataFrame(data=trails_data, schema=trails_schema)
df_trails.show()
df_trails.createOrReplaceTempView("trails")

In [None]:
spark.sql("""
with cte as (
    select 
    hut1 as start, m1.name as start_name, m1.altitude as start_altitude ,
    hut2 as end, m2.name as end_name, m2.altitude as end_altitude 
    from trails t1 
    left join 
    mountain_huts m1 on
    t1.hut1 = m1.id
    left join 
    mountain_huts m2 on
    t1.hut2 = m2.id)
,
cte2 as (
    select 
    if(start_altitude > end_altitude, end, start) as start,
    if(start_altitude > end_altitude, end_name, start_name) as start_name,
    if(start_altitude > end_altitude, end_altitude, start_altitude) as start_altitude,
    if(start_altitude > end_altitude, start, end) as end,
    if(start_altitude > end_altitude, start_name, end_name) as end_name,
    if(start_altitude > end_altitude, start_altitude, end_altitude) as end_altitude
    from cte)
,
cte3 as (
    select t1.start, t1.start_name, t1.start_altitude, t1.end, t1.end_name, t1.end_altitude, t2.end, t2.end_name, t2.end_altitude 
    from cte2 t1 
    inner join 
    cte2 t2 on t1.end = t2.start 
)
select * from cte3
""").show()

In [None]:
footer_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("car", StringType(), True),
    StructField("length", IntegerType(), True),
    StructField("width", IntegerType(), True),
    StructField("height", IntegerType(), True)
])

# Data for FOOTER DataFrame
footer_data = [
    (1, 'Hyundai Tucson', 15, 6, None),
    (2, None, None, None, 20),
    (3, None, 12, 8, 15),
    (4, 'Toyota Rav4', None, 15, None),
    (5, 'Kia Sportage', None, None, 18)
]

# Create DataFrame for FOOTER
df_footer = spark.createDataFrame(data=footer_data, schema=footer_schema)
df_footer.show()
df_footer.createOrReplaceTempView("footer")

In [None]:
spark.sql("""
with car as (
    select distinct first_value(car, true) over(order by id desc rows between unbounded preceding and unbounded following) as car from footer
),
length as (
    select distinct first_value(length, true) over(order by id desc rows between unbounded preceding and unbounded following) as length from footer
),
width as (
    select distinct first_value(width, true) over(order by id desc rows between unbounded preceding and unbounded following)  as width from footer
),
height as (
    select distinct first_value(height, true) over(order by id desc rows between unbounded preceding and unbounded following) as height from footer
)
select * from car cross join length cross join width cross join height
""").show()

In [None]:
# Define schema for salary DataFrame
salary_schema = StructType([
    StructField("emp_id", IntegerType(), True),
    StructField("emp_name", StringType(), True),
    StructField("base_salary", IntegerType(), True)
])

# Data for salary DataFrame
salary_data = [
    (1, 'Rohan', 5000),
    (2, 'Alex', 6000),
    (3, 'Maryam', 7000)
]

# Create DataFrame for salary
df_salary = spark.createDataFrame(data=salary_data, schema=salary_schema)
df_salary.show()

# Define schema for income DataFrame
income_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("income", StringType(), True),
    StructField("percentage", IntegerType(), True)
])

# Data for income DataFrame
income_data = [
    (1, 'Basic', 100),
    (2, 'Allowance', 4),
    (3, 'Others', 6)
]

# Create DataFrame for income
df_income = spark.createDataFrame(data=income_data, schema=income_schema)
df_income.show()

# Define schema for deduction DataFrame
deduction_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("deduction", StringType(), True),
    StructField("percentage", IntegerType(), True)
])

# Data for deduction DataFrame
deduction_data = [
    (1, 'Insurance', 5),
    (2, 'Health', 6),
    (3, 'House', 4)
]

# Create DataFrame for deduction
df_deduction = spark.createDataFrame(data=deduction_data, schema=deduction_schema)
df_deduction.show()

# Create a temporary view for emp_transaction
df_salary.createOrReplaceTempView("salary")
df_income.createOrReplaceTempView("income")
df_deduction.createOrReplaceTempView("deduction")

In [None]:
spark.sql("""
with income_transactions as (
    select emp_id, emp_name, income as trans_type, base_salary * (percentage/100) as amount
    from salary cross join income
),
deduction_transactions as (
    select emp_id, emp_name, deduction as trans_type, base_salary * (percentage/100) as amount
    from salary cross join deduction
)

select * from income_transactions union all
select * from deduction_transactions

""").show()

In [None]:
spark.sql("""
with income_transactions as (
    select emp_id, emp_name, income as trans_type, base_salary * (percentage/100) as amount
    from salary cross join income
),
deduction_transactions as (
    select emp_id, emp_name, deduction as trans_type, base_salary * (percentage/100) as amount
    from salary cross join deduction
),
emp_transactions as (
    select * from income_transactions union all
    select * from deduction_transactions
)
select emp_name, 
sum(if(trans_type = 'Basic', amount, 0)) as Basic,
sum(if(trans_type = 'Allowance', amount, 0)) as Allowance,
sum(if(trans_type = 'Others', amount, 0)) as Others,
sum(if(trans_type in ('Basic', 'Allowance', 'Others') , amount, 0)) as Gross,
sum(if(trans_type = 'Insurance', amount, 0)) as Insurance,
sum(if(trans_type = 'Health', amount, 0)) as Health,
sum(if(trans_type = 'House', amount, 0)) as House,
sum(if(trans_type in ('Insurance', 'Health', 'House') , amount, 0)) as Total_Deduction,
sum(if(trans_type in ('Basic', 'Allowance', 'Others') , amount, 0)) - sum(if(trans_type in ('Insurance', 'Health', 'House') , amount, 0)) as Netpay
from emp_transactions group by 1
""").show()

In [None]:
# Define schema for student_tests DataFrame
student_tests_schema = StructType([
    StructField("test_id", IntegerType(), True),
    StructField("marks", IntegerType(), True)
])

# Data for student_tests DataFrame
student_tests_data = [
    (100, 55),
    (101, 55),
    (102, 60),
    (103, 58),
    (104, 40),
    (105, 50)
]

# Create DataFrame for student_tests
df_student_tests = spark.createDataFrame(data=student_tests_data, schema=student_tests_schema)
df_student_tests.show()
df_student_tests.createOrReplaceTempView("student_tests")

In [None]:
spark.sql("""
with cte as (
    select test_id, marks, lag(marks,1,-1) over(partition by 1 order by test_id) as prev
    from student_tests
)

select test_id, marks from cte where marks > prev

""").show()

In [None]:
from datetime import datetime

# Define schema for Day_Indicator DataFrame
day_indicator_schema = StructType([
    StructField("Product_ID", StringType(), True),
    StructField("Day_Indicator", StringType(), True),
    StructField("Dates", DateType(), True)
])

# Data for Day_Indicator DataFrame
day_indicator_data = [
    ('AP755', '1010101', datetime.strptime('04-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('05-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('06-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('07-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('08-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('09-Mar-2024', '%d-%b-%Y')),
    ('AP755', '1010101', datetime.strptime('10-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('04-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('05-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('06-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('07-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('08-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('09-Mar-2024', '%d-%b-%Y')),
    ('XQ802', '1000110', datetime.strptime('10-Mar-2024', '%d-%b-%Y'))
]

# Create DataFrame for Day_Indicator
df_day_indicator = spark.createDataFrame(data=day_indicator_data, schema=day_indicator_schema)
df_day_indicator.show()
df_day_indicator.createOrReplaceTempView("day_indicator")

In [None]:
spark.sql("""
with cte as (
    select Product_ID,
    Dates,
    if(substr(Day_Indicator,0,1) = 1, 2, 0) as Mon,
    if(substr(Day_Indicator,2,1) = 1, 3, 0) as Tue,
    if(substr(Day_Indicator,3,1) = 1, 4, 0) as Wed,
    if(substr(Day_Indicator,4,1) = 1, 5, 0) as Thu,
    if(substr(Day_Indicator,5,1) = 1, 6, 0) as Fri,
    if(substr(Day_Indicator,6,1) = 1, 7, 0) as Sat,
    if(substr(Day_Indicator,7,1) = 1, 1, 0) as Sun,
    dayOfWeek(Dates) as day
    from day_indicator
)
select Product_ID, Dates from cte 
where 
Mon = day or 
Tue = day or 
Wed = day or 
Thu = day or 
Fri = day or 
Sat = day or 
Sun = day
""").show()

In [None]:
# Define schema for job_skills DataFrame
job_skills_schema = StructType([
    StructField("row_id", IntegerType(), True),
    StructField("job_role", StringType(), True),
    StructField("skills", StringType(), True)
])

# Data for job_skills DataFrame
job_skills_data = [
    (1, 'Data Engineer', 'SQL'),
    (2, None, 'Python'),
    (3, None, 'AWS'),
    (4, None, 'Snowflake'),
    (5, None, 'Apache Spark'),
    (6, 'Web Developer', 'Java'),
    (7, None, 'HTML'),
    (8, None, 'CSS'),
    (9, 'Data Scientist', 'Python'),
    (10, None, 'Machine Learning'),
    (11, None, 'Deep Learning'),
    (12, None, 'Tableau')
]

# Create DataFrame for job_skills
df_job_skills = spark.createDataFrame(data=job_skills_data, schema=job_skills_schema)
df_job_skills.show()
df_job_skills.createOrReplaceTempView("job_skills")

In [None]:
spark.sql("""
with cte as (
    select *, sum(if(job_role is not null, 1, 0)) over(partition by 1 order by row_id) as segment
    from job_skills
)
select row_id, first_value(job_role,true) over(partition by segment) as job_role , skills from cte
""").show()

In [None]:
# Define schema for orders DataFrame
orders_schema = StructType([
    StructField("customer_id", IntegerType(), True),
    StructField("dates", StringType(), True),
    StructField("product_id", IntegerType(), True)
])

# Data for orders DataFrame
orders_data = [
    (1, '2024-02-18', 101),
    (1, '2024-02-18', 102),
    (1, '2024-02-19', 101),
    (1, '2024-02-19', 103),
    (2, '2024-02-18', 104),
    (2, '2024-02-18', 105),
    (2, '2024-02-19', 101),
    (2, '2024-02-19', 106)
]

# Create DataFrame for orders
df_orders = spark.createDataFrame(data=orders_data, schema=orders_schema)
df_orders.show()
df_orders.createOrReplaceTempView("orders")

In [None]:
spark.sql("""
with cte as (
    select dates, concat_ws(",", collect_list(product_id)) 
    from orders
    group by customer_id, dates
)
select * from  cte

""").show()

In [23]:
# Create a list of tuples with your data
data = [
    ('Jason', 'Mary'),
    ('Mike', 'Mary'),
    ('Mike', 'Jason'),
    ('Susan', 'Jason'),
    ('John', 'Mary'),
    ('Susan', 'Mary')
]

# Define the schema
schema = ["Friend1", "Friend2"]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Register DataFrame as a temporary view
df.createOrReplaceTempView("Friends")

# Verify the DataFrame
df.show()




+-------+-------+
|Friend1|Friend2|
+-------+-------+
|  Jason|   Mary|
|   Mike|   Mary|
|   Mike|  Jason|
|  Susan|  Jason|
|   John|   Mary|
|  Susan|   Mary|
+-------+-------+



                                                                                

In [27]:
spark.sql("""
with cte as (
select Friend1, collect_set(Friend2) as friends from (select Friend1, Friend2 from Friends 
                union all select Friend2, Friend1 from Friends)x
group by Friend1
    
)
select * from  cte

""").show(truncate=False)



+-------+--------------------------+
|Friend1|friends                   |
+-------+--------------------------+
|Jason  |[Mike, Susan, Mary]       |
|Susan  |[Jason, Mary]             |
|Mary   |[Mike, Jason, Susan, John]|
|Mike   |[Jason, Mary]             |
|John   |[Mary]                    |
+-------+--------------------------+



                                                                                

24/09/12 06:50:27 ERROR TaskSchedulerImpl: Lost executor 1 on 172.18.0.3: worker lost: Not receiving heartbeat for 60 seconds
24/09/12 06:50:27 ERROR TaskSchedulerImpl: Lost executor 0 on 172.18.0.5: worker lost: Not receiving heartbeat for 60 seconds
