In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

from pyspark.sql import SparkSession
from pyspark.sql import SparkSession


# Creating a spark session
spark = SparkSession.builder.appName('SparkLearning').getOrCreate()

In [16]:
# Import your libraries
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, BooleanType, StructField, StructType, DateType
from pyspark.sql.window import Window
from InputToDataFrame import convert_input_to_df

### Q1. [User with Most Approved Flags](https://platform.stratascratch.com/coding/2104-user-with-most-approved-flags/official-solution?code_type=6)


In [30]:
# user_flag_data
input_data = '''
user_firstname	user_lastname	video_id	flag_id
Richard	Hasson	y6120QOlsfU	0cazx3
Mark	May	Ct6BUPvE2sM	1cn76u
Gina	Korman	dQw4w9WgXcQ	1i43zk
Mark	May	Ct6BUPvE2sM	1n0vef
Mark	May	jNQXAC9IVRw	1sv6ib
Gina	Korman	dQw4w9WgXcQ	20xekb
Mark	May	5qap5aO4i9A	4cvwuv
'''


user_flags_data = convert_input_to_df(input_data)
print(user_flags_data)

[('Richard', 'Hasson', 'y6120QOlsfU', '0cazx3'), ('Mark', 'May', 'Ct6BUPvE2sM', '1cn76u'), ('Gina', 'Korman', 'dQw4w9WgXcQ', '1i43zk'), ('Mark', 'May', 'Ct6BUPvE2sM', '1n0vef'), ('Mark', 'May', 'jNQXAC9IVRw', '1sv6ib'), ('Gina', 'Korman', 'dQw4w9WgXcQ', '20xekb'), ('Mark', 'May', '5qap5aO4i9A', '4cvwuv')]


In [21]:
input_data = '''
0cazx3	FALSE		
1cn76u	TRUE	2022-03-15	REMOVED
1i43zk	TRUE	2022-03-15	REMOVED
1n0vef	TRUE	2022-03-15	REMOVED
1sv6ib	TRUE	2022-03-15	APPROVED
20xekb	TRUE	2022-03-17	REMOVED
4cvwuv	TRUE	2022-03-15	APPROVED	
4sd6dv	TRUE	2022-03-14	REMOVED
6jjkvn	TRUE	2022-03-16	APPROVED
7ks264	TRUE	2022-03-15	APPROVED
'''

flag_review_data = convert_input_to_df(input_data)
print(flag_review_data)

[('1cn76u', 'TRUE', '2022-03-15', 'REMOVED'), ('1i43zk', 'TRUE', '2022-03-15', 'REMOVED'), ('1n0vef', 'TRUE', '2022-03-15', 'REMOVED'), ('1sv6ib', 'TRUE', '2022-03-15', 'APPROVED'), ('20xekb', 'TRUE', '2022-03-17', 'REMOVED'), ('4cvwuv', 'TRUE', '2022-03-15', 'APPROVED')]


In [35]:
# DataFrames
user_flags_schema = StructType([
    StructField("user_firstname", StringType(), nullable=True),
    StructField("user_lastname", StringType(), nullable=True),
    StructField("video_id", StringType(), nullable=True),
    StructField("flag_id", StringType(), nullable=True)
])

flag_review_schema = StructType([
    StructField("flag_id", StringType(), nullable=True),
    StructField("reviewed_by_yt", StringType(), nullable=True),
    StructField("reviewed_date", StringType(), nullable=True),
    StructField("reviewed_outcome", StringType(), nullable=True)
])

user_flags = spark.createDataFrame(user_flags_data, schema=user_flags_schema)
user_flags.printSchema()
user_flags.show()

print("=" * 30)

flag_review = spark.createDataFrame(flag_review_data, schema=flag_review_schema)
flag_review.printSchema()
flag_review.show()

root
 |-- user_firstname: string (nullable = true)
 |-- user_lastname: string (nullable = true)
 |-- video_id: string (nullable = true)
 |-- flag_id: string (nullable = true)

+--------------+-------------+-----------+-------+
|user_firstname|user_lastname|   video_id|flag_id|
+--------------+-------------+-----------+-------+
|       Richard|       Hasson|y6120QOlsfU| 0cazx3|
|          Mark|          May|Ct6BUPvE2sM| 1cn76u|
|          Gina|       Korman|dQw4w9WgXcQ| 1i43zk|
|          Mark|          May|Ct6BUPvE2sM| 1n0vef|
|          Mark|          May|jNQXAC9IVRw| 1sv6ib|
|          Gina|       Korman|dQw4w9WgXcQ| 20xekb|
|          Mark|          May|5qap5aO4i9A| 4cvwuv|
+--------------+-------------+-----------+-------+

root
 |-- flag_id: string (nullable = true)
 |-- reviewed_by_yt: string (nullable = true)
 |-- reviewed_date: string (nullable = true)
 |-- reviewed_outcome: string (nullable = true)

+-------+--------------+-------------+----------------+
|flag_id|reviewed_by_y

In [33]:
# Approach :=> 1
result = user_flags.join(flag_review, on="flag_id", how="inner")
result = result.filter(F.lower(result["reviewed_outcome"]) == "approved")
result = result.withColumn("username", F.concat(result["user_firstname"], F.lit(" "), result["user_lastname"]))
result = result.groupby("username").agg(F.countDistinct("video_id").alias("video_count"))
result = result.withColumn("rank", F.rank().over(Window.orderBy(F.desc("video_count"))))
result = result.filter(result["rank"] == 1).select("username")
result.show()

+--------+
|username|
+--------+
|Mark May|
+--------+



In [34]:
# Approach :=> 2
df = (
    user_flags.join(flag_review, on="flag_id", how="inner")
    .filter(F.col("reviewed_outcome") == "APPROVED")
    .groupBy(F.col("user_firstname"), F.col("user_lastname"))
    .agg(F.countDistinct("video_id").alias("total_video_count"))
)

# Rank users by total_video_count and Filter to keep only the top-ranked users
windowSpec = Window.orderBy(df["total_video_count"].desc())
df = df.withColumn("rank", F.dense_rank().over(windowSpec)) \
        .filter(F.col("rank") == 1) \
        .drop("total_video_count", "rank")


# select all the usernames
df = df.select(F.expr("user_firstname || ' ' || user_lastname").alias("username"))

df.show()

+--------+
|username|
+--------+
|Mark May|
+--------+



# Q2. [Workers With The Highest Salaries](https://platform.stratascratch.com/coding/10353-workers-with-the-highest-salaries?code_type=6)

In [None]:
# Approach :=> 1
from  pyspark.sql.functions import col, sum, rank, desc
from  pyspark.sql.window import Window


df = worker.join(title, worker.worker_id == title.worker_ref_id)
df = df.withColumn("rnk", rank().over(Window.orderBy(desc("salary")))) \
        .where(col("rnk") == 1) \
        .select(col("worker_title").alias("best_paid_title"))
        

df.toPandas()


In [None]:
# Approach :=> 2

import  pyspark.sql.functions as F

result = worker.join(title, worker.worker_id == title.worker_ref_id)
result = result.filter(result["salary"] == result.select(F.max(result["salary"])).first()[0])
result = result.select(result["worker_title"]).withColumnRenamed("worker_title", "best_paid_title")
result.toPandas()


# Q3: [Election Results](https://platform.stratascratch.com/coding/2099-election-results?code_type=6)

In [None]:
# Approach :=> 1

# Import your libraries
import pyspark.sql.functions as F

# Start writing code
df = voting_results.groupBy(voting_results["voter"])\
        .agg(F.count(voting_results["candidate"]).alias("vote_count"))

df = df.withColumn("vote_value", F.round(1.0/df["vote_count"], 3))

combined_df = df.join(voting_results, "voter") \
                .select(F.col("candidate"), F.col("vote_value"))

result = combined_df.groupBy(combined_df["candidate"]) \
            .agg(
               F.round(F.sum(combined_df["vote_value"]), 3).alias("total_vote_value")) 
result = result.sort(F.col("total_vote_value"), ascending = False).limit(1).select(F.col("candidate"))
# To validate your solution, convert your final pySpark df to a pandas df
result.toPandas()

In [None]:
# Approach :=> 2

from pyspark.sql import functions as F

# Calculate vote value for each voter
df = voting_results.groupBy("voter").agg(F.round(1.0 / F.count("candidate"), 3).alias("vote_value"))

# Calculate net vote for each candidate
merged_df = df.join(voting_results, "voter") \
    .filter(F.col("candidate").isNotNull()) \
    .groupBy("candidate").agg(F.sum("vote_value").alias("net_vote")) \
    .orderBy(F.desc("net_vote")) \
    .select("candidate") \
    .limit(1)

# Show the result as a Pandas DataFrame
merged_df.toPandas()


In [None]:
# Approach :=> 3

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Calculate vote value for each voter
df = voting_results.groupBy("voter").agg(F.round(1.0 / F.count("candidate"), 3).alias("vote_value"))

# Calculate net vote for each candidate
merged_df = df.join(voting_results, "voter") \
    .filter(F.col("candidate").isNotNull()) \
    .groupBy("candidate").agg(F.sum("vote_value").alias("net_vote")) \
    .select(F.col("candidate"), F.col("net_vote"))
  
merged_df = merged_df.withColumn("rank", F.dense_rank().over(Window.orderBy(F.col("net_vote").desc())))
merged_df = merged_df.filter(F.col("rank") == 1).select(F.col("candidate"))
# Show the result as a Pandas DataFrame
merged_df.show()

merged_df.toPandas()


# Q4: [Make a report showing the number of survivors and non-survivors by passenger class](https://platform.stratascratch.com/coding/9881-make-a-report-showing-the-number-of-survivors-and-non-survivors-by-passenger-class?code_type=6)

In [None]:
# Import your libraries
from pyspark.sql.functions import col, sum, when

# Start writing code
df = titanic.select("survived", "pclass")
df = df.groupBy(col("survived")).agg(
        sum(when(col("pclass") == 1, 1).otherwise(0)).alias("first_class"),
        sum(when(col("pclass") == 2, 1).otherwise(0)).alias("second_class"),
        sum(when(col("pclass") == 3, 1).otherwise(0)).alias("third_class"),
    )
df.toPandas()
# To validate your solution, convert your final pySpark df to a pandas df


# Q5: [Bikes Last Used](https://platform.stratascratch.com/coding/10176-bikes-last-used?code_type=6)

In [None]:
# Approach :=> 2
# Import your libraries
from pyspark.sql.functions import col, max

# Start writing code
df = dc_bikeshare_q1_2012.select("bike_number", "end_time")
df = df.groupBy("bike_number") \
       .agg(max(col("end_time")).alias("last_used")) \
       .orderBy(col("last_used").desc())

# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()

In [None]:
# Approach :=> 2
# Import your libraries
import pyspark
from pyspark.sql import Window 
from pyspark.sql.functions import desc , row_number

# Start writing code
window = Window.partitionBy("bike_number").orderBy(desc("end_time"))
df = dc_bikeshare_q1_2012.select("bike_number", "end_time")
df = df.withColumn("rnk", row_number().over(window))
df = df.filter(df.rnk == 1).select(df.bike_number, df.end_time.alias("last_time"))
df.toPandas()

# Q6: 

In [4]:
source_data = [(1, 'A'), (2, 'B'), (3, 'C'), (4, 'D')]
source_columns = ['id','name']
target_data = [(1, 'A'), (2, 'B'), (4, 'X'), (5, 'F')]
target_columns = ['id','name']

source_df = spark.createDataFrame(data=source_data, schema=source_columns)
target_df = spark.createDataFrame(data=target_data, schema=target_columns)

source_df.show()
print("=" * 30)
target_df.show()


+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  3|   C|
|  4|   D|
+---+----+

+---+----+
| id|name|
+---+----+
|  1|   A|
|  2|   B|
|  4|   X|
|  5|   F|
+---+----+



In [14]:
from pyspark.sql.functions import col, when, coalesce

df = source_df.alias("s").join(target_df.alias("t"), col("s.id") == col("t.id"), "full")
df = df.select(
        col("s.id").alias("s_id"),
        col("s.name").alias("s_name"),
        col("t.id").alias("t_id"),
        col("t.name").alias("t_name"),
    )
df = df.withColumn(
        "comments", 
        when((col("s_id") == col("t_id")) & (col("s_name") != col("t_name")), "Mis-match") \
        .when(col("s_id").isNull(), "new in targer") \
        .when(col("t_id").isNull(), "new in source")
    )

df = df.filter(df.comments.isNotNull()) \
    .withColumn("Id", coalesce(df.s_id, df.t_id)) \
    .select(col("Id"), col("comments"))

df.show()

+---+-------------+
| Id|     comments|
+---+-------------+
|  3|new in source|
|  4|    Mis-match|
|  5|new in targer|
+---+-------------+



# Q7: [Most Profitable Companies](https://platform.stratascratch.com/coding/10354-most-profitable-companies?code_type=6)

In [None]:
# Import your libraries
import pyspark

# Start writing code
df = forbes_global_2010_2014
df = df.orderBy(desc('profits')).select('company', 'profits').limit(3)

# To validate your solution, convert your final pySpark df to a pandas df
df.toPandas()

# Import your libraries
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import * 

# Start writing code
df = forbes_global_2010_2014
df = df.groupBy(df.company)\
.agg(F.sum(df.profits).alias("profits")).orderBy(desc("profits")).limit(3)

# To validate your solution, convert your final pySpark df to a pandas df
df.show()

In [None]:
# Import your libraries
from pyspark.sql.functions import col, dense_rank, desc, sum
from pyspark.sql.window import Window

# Start writing code
df = forbes_global_2010_2014
df = df.groupBy("company") \
        .agg(sum("profits").alias("total_profits"))

df = df.withColumn("rnk", dense_rank().over(Window.orderBy(desc(col("total_profits"))))) \
        .where(col("rnk") <= 3) \
        .select(col("company"), col("total_profits"))
        
        
df.show()

# Q8: [Activity Rank](https://platform.stratascratch.com/coding/10351-activity-rank?code_type=6)

In [None]:
# Import your libraries
from pyspark.sql.functions import col, when, min, max, avg, to_date

# Start writing code
# Calculate time difference in seconds

df = facebook_web_log
df = df.where("action in ('page_load', 'page_exit')") \
        .withColumn("date", to_date(col("timestamp")))
        
df = df.groupBy(col("user_id"), col("date")) \
      .agg(
           max(when(col("action") == "page_load", col("timestamp"))).alias("load_time"), 
           min(when(col("action") == "page_exit", col("timestamp"))).alias("exit_time")
          ) \
      .filter(col("load_time").isNotNull() & col("exit_time").isNotNull())
      
df = df.withColumn("duration", col("exit_time") - col("load_time"))
df = df.groupBy(col("user_id")) \
        .agg(avg(col("duration")).alias("duration"))
        
df.toPandas()




In [None]:
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.window import Window

df = (
    facebook_web_log
    .filter(col('action').isin(['page_load', 'page_exit']))
    .withColumn('timestamp', col('timestamp').cast("timestamp"))
    .groupby('user_id', date_format('timestamp', 'yyyy-MM-dd').alias('date'))
    .agg(
        max(when(col('action') == 'page_load', col('timestamp'))).alias('start'),
        min(when(col('action') == 'page_exit', col('timestamp'))).alias('end')
    )
    .dropna()
    .groupby('user_id')
    .agg(
        mean(unix_timestamp('end') - unix_timestamp('start')).alias('duration')
    )
).toPandas()

# Q9: [Activity Rank](https://platform.stratascratch.com/coding/10351-activity-rank?code_type=6)

In [None]:
from pyspark.sql.functions import count, col, row_number
from pyspark.sql.window import Window

# Group by "from_user" and count total emails sent by each user
email_counts_df = google_gmail_emails.groupBy("from_user") \
    .agg(count("*").alias("total_emails"))

# Rank users based on the total number of emails sent
window_spec = Window.orderBy(col("total_emails").desc(), col("from_user").asc())
ranked_emails_df = email_counts_df.withColumn("rank", row_number().over(window_spec))

# Display the result
ranked_emails_df.show()

# Convert the Spark DataFrame to a Pandas DataFrame for validation
ranked_emails_df.toPandas()


# Q10: [Finding User Purchases](https://platform.stratascratch.com/coding/10322-finding-user-purchases?code_type=6)

In [None]:
from pyspark.sql.functions import col, lag, datediff
from pyspark.sql.window import Window

df = amazon_transactions
window_spec = Window.partitionBy("user_id").orderBy("created_at")
df = amazon_transactions.withColumn("previous_purchase_date", lag(col("created_at")).over(window_spec)) \
                         .withColumn("days", datediff(col("created_at"), col("previous_purchase_date")))
                         
df = df.where(col("days") <= 7) \
        .select("user_id").distinct()

df.show()
df.toPandas()


# Q11: [Monthly Percentage Difference](https://platform.stratascratch.com/coding/10319-monthly-percentage-difference?code_type=6)

In [None]:
import pyspark
from pyspark.sql.functions import date_format, col, sum, lag, round
from pyspark.sql.window import Window

# Extract the year and month from the "created_at" column and group by it
df = sf_transactions.withColumn("date", date_format("created_at", "yyyy-MM")) \
                    .groupBy("date") \
                    .agg(
                        sum("value").alias("total_revenue")
                    )

# Calculate the revenue of the previous month using lag window function
window = Window.orderBy("date")
df = df.withColumn("last_month_revenue", lag(col("total_revenue")).over(window))

# Calculate the month-over-month percentage change in revenue
df = df.withColumn("percentage_change", 
                   (col("total_revenue") - col("last_month_revenue")) / col("last_month_revenue") * 100
                  )

# Round the percentage change to 2 decimal points
df = df.withColumn("revenue_diff_pct", round(col("percentage_change"), 2))
df = df.orderBy(col("date")) \
        .select(col("date"), col("revenue_diff_pct"))
# To validate your solution, convert your final PySpark DataFrame to a pandas DataFrame
df.show()
df.toPandas()


In [None]:
import pyspark
from pyspark.sql.functions import date_format, col, sum, lag, round
from pyspark.sql.window import Window

# Extract the year and month from the "created_at" column and group by it
window = Window.orderBy("date")
df = sf_transactions.withColumn("date", date_format("created_at", "yyyy-MM")) \
                    .groupBy("date") \
                    .agg(
                        sum("value").alias("total_revenue"),
                        lag(sum("value")).over(window).alias("last_month_revenue")
                    )

df = df.withColumn("percentage_change", 
                   (col("total_revenue") - col("last_month_revenue")) / col("last_month_revenue") * 100
                  )

# Round the percentage change to 2 decimal points
df = df.withColumn("revenue_diff_pct", round(col("percentage_change"), 2))
df = df.orderBy(col("date")) \
        .select(col("date"), col("revenue_diff_pct"))
        
# To validate your solution, convert your final PySpark DataFrame to a pandas DataFrame
df.show()
df.toPandas()


# Q12: [New Products](https://platform.stratascratch.com/coding/10318-new-products/official-solution?code_type=6)

In [None]:
df1 = car_launches.withColumn("diff",when(col("year")=='2019',-1).otherwise(1))
df2 = df1.groupBy("company_name").agg(sum("diff").alias("net_products"))
df2.select("company_name","net_products").toPandas()

df2.toPandas()

In [None]:
window_spec = Window.partitionBy(col("company_name")).orderBy(col("year").asc())

# Calculate the count of products for each company and year
df = car_launches.groupBy("company_name", "year") \
                 .agg(
                     count("product_name").alias("curr"),
                     lag(count("product_name")).over(window_spec).alias("prev")
                 )

# Filter the data for the year 2020 and calculate the net difference
df = df.filter(col("year") == "2020") \
      .withColumn("net_products", col("curr") - coalesce(col("prev"), lit(0))) \
      .select(col("company_name"), col("net_products"))

# Select columns and display the result
df.show()

# Convert the Spark DataFrame to a Pandas DataFrame for validation
df.toPandas()

# Q13: [Salaries Differences](https://platform.stratascratch.com/coding/10308-salaries-differences?code_type=6)

In [None]:
# Import your libraries
import pyspark
from pyspark.sql import functions as F

# Start writing code
db_employee = db_employee.join(db_dept, db_employee['department_id'] == db_dept['id'])\
    .filter(db_dept['department'].isin(['engineering', 'marketing'])) \
    .groupby() \
    .pivot('department') \
    .agg(F.max('salary')) \
    .select(F.abs(F.col('engineering') - F.col('marketing')).alias('salary_difference'))


# To validate your solution, convert your final pySpark df to a pandas df
db_employee.toPandas()

In [None]:
# Import your libraries
from pyspark.sql.functions import *

# Start writing code
df = db_employee.join(db_dept, db_employee.department_id == db_dept.id, "inner")
df = df.agg(
        max(when(col("department") == "marketing", col("salary"))).alias("max_marketing_salary"),
        max(when(col("department") == "engineering", col("salary"))).alias("max_engineering_salary")
    ).withColumn("salary_difference", 
                abs(col("max_marketing_salary") - col("max_engineering_salary"))
            ).select("salary_difference")
                
# To validate your solution, convert your final pySpark df to a pandas df
df.show()
df.toPandas()

In [None]:
from pyspark.sql.functions import *

# Start writing code
db_employee = db_employee \
    .join(db_dept, on=db_employee.department_id == db_dept.id, how="inner") \
    .select((max(when(col("department") == "marketing", col("salary"))) - 
            max(when(col("department") == "engineering", col("salary"))))
            .alias("salary_difference"))

# To validate your solution, convert your final pySpark df to a pandas df
db_employee.toPandas()

# Q14: [Risky Projects](https://platform.stratascratch.com/coding/10304-risky-projects?code_type=6)

In [None]:
from pyspark.sql.functions import *

# Calculate the duration of each project in days
df1 = linkedin_projects.withColumn("days", datediff(col("end_date"), col("start_date")))

# Join employee projects with employees
df2 = linkedin_emp_projects.join(linkedin_employees, linkedin_emp_projects.emp_id == linkedin_employees.id)

# Calculate the salary per day for each employee
df2 = df2.withColumn("salary_per_day", col("salary") / 365)

# Group by project_id and aggregate the total salary per day and per year
df2 = df2.groupBy("project_id").agg(
    sum("salary_per_day").alias("total_salary_per_day"),
    sum("salary").alias("total_salary_per_year")
)

# Join the project data with the aggregated employee data
df = df1.join(df2, df1.id == df2.project_id)

# Calculate the prorated total employee expense
df = df.withColumn("prorated_expense", col("total_salary_per_day") * col("days"))

# Identify projects that are overbudget
df = df.withColumn("overbudget", when(col("prorated_expense") > col("budget"), "Yes").otherwise("No"))

# Filter projects that are overbudget
df = df.filter(col("overbudget") == "Yes")

# Select the required columns and round the prorated expense to the nearest dollar
df = df.select("title", "budget", ceil("prorated_expense").alias("prorated_expense"))

# Show the result
df.show()
df.toPandas()



In [None]:
# Import your libraries
from pyspark.sql.functions import *
from pyspark.sql.window import Window

linkedin_emp_projects = linkedin_emp_projects.join(linkedin_projects, linkedin_emp_projects["project_id"]==linkedin_projects["id"],"inner")\
    .select("project_id","emp_id", "title","budget","start_date","end_date")\
    .join(linkedin_employees, linkedin_emp_projects["emp_id"]==linkedin_employees["id"])\
    .select("project_id","emp_id","salary", "title","budget","start_date","end_date")

linkedin_emp_projects = linkedin_emp_projects.withColumn("duration", datediff(col("end_date"),col("start_date"))/365).distinct()

window = Window.partitionBy("title")
linkedin_emp_projects = linkedin_emp_projects.withColumn("total_sal", ceil(sum(col("salary")).over(window)*col("duration")) ).select("title","budget", "total_sal").distinct().filter(col("total_sal")>col("budget"))
# To validate your solution, convert your final pySpark df to a pandas df
linkedin_emp_projects.toPandas()