In [1]:
### Spark Tasks:
# 1. **Data Aggregation:**
# Read a dataset containing sales transactions. 
# Calculate the total sales amount for each product category using Spark's `groupBy` and aggregation functions.

In [212]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum
from pyspark.sql import functions as F
from pyspark.sql.functions import col, window, expr, date_format
from pyspark import SparkConf, SparkContext

In [25]:
spark = SparkSession.builder.appName("DataAggregation").getOrCreate()

In [26]:
file_location = "C:\\Ieva\\Python_Bootcamp\\Task folder\\24.08.2023_tasks\\sales.csv"

In [27]:
sales_df = spark.read.csv(file_location, header=True, inferSchema=True)

In [28]:
sales_df

DataFrame[Product: string, Category: string, Amount: int]

In [30]:
result_df = sales_df.groupBy("Category").agg(sum("Amount").alias("total_sales_amount"))

In [31]:
result_df.show()

+-----------+------------------+
|   Category|total_sales_amount|
+-----------+------------------+
|Electronics|               450|
|   Clothing|               125|
+-----------+------------------+



In [32]:
spark.stop()

In [93]:
# 2. **Log Analysis:**
# Analyze server log data to find the most frequently accessed URLs and their corresponding IP addresses.
# Use Spark SQL to query and visualize the results.

In [174]:
spark = SparkSession.builder.appName("LogAnalysis").getOrCreate()

In [175]:
log_df = spark.read.text("C:\\Ieva\\Python_Bootcamp\\Task folder\\24.08.2023_tasks\\server_log.txt").selectExpr("split(value, ' ')[2] as url", "split(value, ' ')[3] as ip_address")

In [176]:
log_df

DataFrame[url: string, ip_address: string]

In [177]:
url_ip_count = log_df.groupBy("url", "ip_address").count()
ranked_url_count = url_ip_count.withColumn("Access rank", F.dense_rank().over(Window.orderBy(F.desc("count"))))

In [178]:
result = ranked_url_count.select(
    F.col("url").alias("URL"),
    F.col("ip_address").alias("IP Address"),
    F.col("count").alias("Access Count"),
    F.col("Access rank")
)

In [179]:
result.show()

+------+-------------+------------+-----------+
|   URL|   IP Address|Access Count|Access rank|
+------+-------------+------------+-----------+
|/page2|192.168.1.103|           3|          1|
|/page1|192.168.1.102|           1|          2|
|/page2|192.168.1.101|           1|          2|
|/page3|192.168.1.100|           1|          2|
|/page1|192.168.1.100|           1|          2|
+------+-------------+------------+-----------+



In [180]:
spark.stop()

In [196]:
### MapReduce Tasks:

# 1. **URL Access Count:**
#    Given a log file containing records of URLs accessed and their corresponding timestamps, 
# use MapReduce to count the number of times each URL was accessed within a specific time window.

In [197]:
spark = SparkSession.builder.appName("URLAccessCount").getOrCreate()

In [198]:
log_data = spark.read.text("C:\\Ieva\\Python_Bootcamp\\Task folder\\24.08.2023_tasks\\access_log.txt")
log_df = log_data.selectExpr("split(value, ' ')[0] as date_time", "split(value, ' ')[2] as url")

In [199]:
time_window = "1 hour"
result = log_df.groupBy("url", window("date_time", time_window)).count()

In [200]:
formatted_result = result.select(
    expr("replace(url, ':', '')").alias("URL"),
    date_format(col("window.start"), "yyyy-MM-dd").alias("Date"),
    col("count").alias("Access_Count"))

In [201]:
formatted_result.show(truncate=False)

+------+----------+------------+
|URL   |Date      |Access_Count|
+------+----------+------------+
|/page2|2023-08-01|2           |
|/page1|2023-08-01|2           |
|/page3|2023-08-01|1           |
+------+----------+------------+



In [202]:
spark.stop()

In [None]:
# 2. **Follower Recommendations:**
#    Given a dataset representing a social network's following graph. 
# Use MapReduce to recommend the users to follow for another users who do have a mutual followers,
# but do not follow each other.

In [221]:
conf = SparkConf().setMaster("local[*]").setAppName("FollowerRecommendations")
sc = SparkContext(conf=conf)

In [222]:
follower_data = sc.textFile("C:\\Ieva\\Python_Bootcamp\\Task folder\\24.08.2023_tasks\\follower_graph.txt")

In [223]:
follower_pairs = follower_data.map(lambda line: line.split()).map(lambda parts: (parts[0], parts[1:]))

In [225]:
mutual_followers = follower_pairs \
    .flatMapValues(lambda follows: follows) \
    .join(follower_pairs) \
    .filter(lambda x: x[1][0] != x[1][1]) \
    .groupByKey() \
    .mapValues(list)

In [226]:
max_results = 10
results_counter = 0

In [227]:
for user, recommendations in mutual_followers.collect():
    print(f"User {user} might also follow: {recommendations}")
    results_counter += 1
    if results_counter >= max_results:
        break

User 4 might also follow: [('424', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('625', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('649', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('218', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('635', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('936', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('139', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733', '865']), ('406', ['424', '625', '649', '218', '635', '936', '139', '406', '951', '934', '555', '513', '499', '236', '733'

In [219]:
sc.stop()