In [1]:
# catching spark instance
import findspark
findspark.init()

# initialize sparsession and sparkcontext
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc=spark.sparkContext

# import necssary libraries
from pyspark.sql.functions import col,collect_list,concat_ws,concat,lit,when
from pyspark.sql.types import StringType
from functools import reduce
from pyspark.sql import DataFrame

# path to save files
filepath=""

# Generate rdd & broadcast

In [2]:
points = sc.parallelize([('p1', 20.9, 15.3, 20.4),
                         ('p2', 0.6, 34.7, 8.1),
                         ('p3', 12.1, 15.8, 2.3),
                         ('p4', 15.0, 5.8, 16.9)])
centers = sc.broadcast([('c1', 10, 10, 10),
                        ('c2', 20, 20, 20)])


# Generate dataframes

In [3]:
points_df=spark.createDataFrame(points,["points","col1","col2","col3"])
points_df=points_df.withColumn("Total_Points",col("col1")+col("col2")+col("col3"))
points_df=points_df.select("points","Total_Points")
points_df.show()

+------+------------------+
|points|      Total_Points|
+------+------------------+
|    p1|              56.6|
|    p2|43.400000000000006|
|    p3|              30.2|
|    p4|              37.7|
+------+------------------+



In [4]:
center_df=spark.createDataFrame(centers.value,["center","center1","center2","center3"])
center_df=center_df.withColumn("Total_Center",col("center1")+col("center2")+col("center3"))
center_df=center_df.select("center","Total_Center")
center_df.show()

+------+------------+
|center|Total_Center|
+------+------------+
|    c1|          30|
|    c2|          60|
+------+------------+



# RDD to dict

In [5]:
points_rdd=points_df.rdd
keypair_points_rdd=points_rdd.map(lambda x : (x[0],x[1]))
dict_points=keypair_points_rdd.collectAsMap()

dict_points

{'p1': 56.6, 'p2': 43.400000000000006, 'p3': 30.2, 'p4': 37.7}

In [6]:
center_rdd=center_df.rdd
keypair_center_rdd=center_rdd.map(lambda x : (x[0],x[1]))
dict_center=keypair_center_rdd.collectAsMap()

dict_center

{'c1': 30, 'c2': 60}

# getting results

In [7]:
new_dict={}
for key,value in dict_points.items():
    
    for k,v in dict_center.items():
            new_dict[key+k]=abs(value-v)
       
new_dict            

{'p1c1': 26.6,
 'p1c2': 3.3999999999999986,
 'p2c1': 13.400000000000006,
 'p2c2': 16.599999999999994,
 'p3c1': 0.1999999999999993,
 'p3c2': 29.8,
 'p4c1': 7.700000000000003,
 'p4c2': 22.299999999999997}

In [8]:
rdd = sc.parallelize([new_dict])
processed_df=spark.read.json(rdd)
processed_df.show()

+----+------------------+------------------+------------------+------------------+----+-----------------+------------------+
|p1c1|              p1c2|              p2c1|              p2c2|              p3c1|p3c2|             p4c1|              p4c2|
+----+------------------+------------------+------------------+------------------+----+-----------------+------------------+
|26.6|3.3999999999999986|13.400000000000006|16.599999999999994|0.1999999999999993|29.8|7.700000000000003|22.299999999999997|
+----+------------------+------------------+------------------+------------------+----+-----------------+------------------+



In [9]:
processed_df_1=processed_df.select(when(col("p1c1")<col("p1c2"),"p1<\\tab>c1").otherwise("p1<\\tab>c2").alias("P1"),
              when(col("p2c1")<col("p2c2"),"p2<\\tab>c1").otherwise("p2<\\tab>c2").alias("P2"),
              when(col("p3c1")<col("p3c2"),"p3<\\tab>c1").otherwise("p3<\\tab>c2").alias("P3"),
              when(col("p4c1")<col("p4c2"),"p4<\\tab>c1").otherwise("p4<\\tab>c2").alias("P4"))

In [10]:
d1=processed_df_1.select("P1")
d2=processed_df_1.select("P2")
d3=processed_df_1.select("P3")
d4=processed_df_1.select("P4")

In [11]:
dfs = [d1,d2,d3,d4]
df_data = reduce(DataFrame.unionAll, dfs)
df_data.show()

+----------+
|        P1|
+----------+
|p1<\tab>c2|
|p2<\tab>c1|
|p3<\tab>c1|
|p4<\tab>c1|
+----------+



In [12]:
df_data.coalesce(1)\
.write.mode("overwrite")\
.option('header', 'true') \
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
.csv(filepath+"problem_1")

# Problem 2

In [13]:
df_data_1=df_data.select(df_data["P1"].substr(1,2).alias("points"),
              df_data["P1"].substr(9,10).alias("center"))

df_data_1=df_data_1.sort(df_data_1["center"].asc())

df_data_1=df_data_1.groupBy("center").agg(concat_ws(",", collect_list("points")).alias("points"))

df_data_1.show()

+------+--------+
|center|  points|
+------+--------+
|    c1|p2,p3,p4|
|    c2|      p1|
+------+--------+



In [14]:
df_data_1.select(concat(col("center"), lit("<\\tab>"), col("points")).alias("data")).show()

+----------------+
|            data|
+----------------+
|c1<\tab>p2,p3,p4|
|      c2<\tab>p1|
+----------------+



In [15]:
df_data_1.coalesce(1)\
.write.mode("overwrite")\
.option('header', 'true') \
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs","false")\
.csv(filepath+"problem_2")