In [1]:
import sys
import os
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext, SparkSession, DataFrame
import pyspark.sql.functions as F
from pyspark.sql.window import Window 
from pyspark.sql.types import DateType

os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["YARN_CONF_DIR"] = "/etc/hadoop/conf"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"
os.environ["HADOOP_CONF_DIR"] = "/etc/hadoop/conf/"

In [2]:
spark = SparkSession.builder \
                    .master("yarn") \
                    .appName("Project_7_4") \
                    .getOrCreate()

24/09/03 13:23:00 WARN Utils: Your hostname, fhmugce59tbv43vsp0ua resolves to a loopback address: 127.0.1.1; using 172.16.0.3 instead (on interface eth0)
24/09/03 13:23:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/03 13:23:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/09/03 13:23:03 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [3]:
def geo_transform(geo_path: str, sql) -> DataFrame:
    cities_geo = (sql.read.option("header", True)
            .option("delimiter", ";")
            .csv(geo_path)
            .withColumn("lat_g", F.regexp_replace("lat", ",", ".").cast("float"))
            .withColumn("lng_g", F.regexp_replace("lng", ",", ".").cast('float'))
            .drop("lat", "lng")
            .persist()
            )
    return cities_geo

In [4]:
# Test
geo_transform_df = geo_transform("/user/denis19/data/geo/cities/actual/geo.csv", spark)
geo_transform_df.show()

[Stage 1:>                                                          (0 + 1) / 1]

+---+----------+--------+--------+
| id|      city|   lat_g|   lng_g|
+---+----------+--------+--------+
|  1|    Sydney| -33.865|151.2094|
|  2| Melbourne|-37.8136|144.9631|
|  3|  Brisbane|-27.4678|153.0281|
|  4|     Perth|-31.9522|115.8589|
|  5|  Adelaide|-34.9289|138.6011|
|  6|Gold Coast|-28.0167|   153.4|
|  7|Cranbourne|-38.0996|145.2834|
|  8|  Canberra|-35.2931|149.1269|
|  9| Newcastle|-32.9167|  151.75|
| 10|Wollongong|-34.4331|150.8831|
| 11|   Geelong|  -38.15|  144.35|
| 12|    Hobart|-42.8806| 147.325|
| 13|Townsville|-19.2564|146.8183|
| 14|   Ipswich|-27.6167|152.7667|
| 15|    Cairns|-16.9303|145.7703|
| 16| Toowoomba|-27.5667|  151.95|
| 17|    Darwin|-12.4381|130.8411|
| 18|  Ballarat|  -37.55|  143.85|
| 19|   Bendigo|  -36.75|144.2667|
| 20|Launceston|-41.4419| 147.145|
+---+----------+--------+--------+
only showing top 20 rows



                                                                                

In [5]:
def events_transform_from(events_path: str, sql) -> DataFrame:
    events_transform_from = (sql
        .read.parquet(events_path)
        # отобрать только те строки, где "event_type" = "message"
        .where('event_type = "message"')
        # отбираем необходимые столбцы
        .selectExpr("event.message_id as message_id_from", "event.message_from", "event.subscription_channel", "lat", "lon", "date")
        # отбираем только те строки, где нет NULL значений
        .where("lat IS NOT NULL and lon IS NOT NULL")
        # переименовываем столбцы для удобства
        .withColumnRenamed("lat", "lat_eff")  
        .withColumnRenamed("lon", "lon_eff")
        # отбираем только те строки, где нет NULL значений  
        .where("message_from IS NOT NULL")
        .persist()
    )
    
    window = Window().partitionBy("message_from").orderBy(F.col("date").desc())
    events_transform_from = (
        events_transform_from
        # добавляем новый столбец "row_number", который содержит номер строки в каждой группе, отсортированной по дате
        .withColumn("row_number", F.row_number().over(window))
        # фильтруем строки, оставляя только первую строку в каждой группе (самую последнюю по дате)
        .filter(F.col("row_number") == 1)
        .drop("row_number")
        .persist()
    )

    return events_transform_from

In [6]:
# Test
events_transform_from_df = events_transform_from("/user/master/data/geo/events", spark)
events_transform_from_df.filter(F.col("subscription_channel").isNotNull()).show()


24/09/03 13:23:52 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
                                                                                

+---------------+------------+--------------------+-------+-------+----+
|message_id_from|message_from|subscription_channel|lat_eff|lon_eff|date|
+---------------+------------+--------------------+-------+-------+----+
+---------------+------------+--------------------+-------+-------+----+



In [7]:
def events_subscriptions(events_path: str, sql) -> DataFrame:
    # чтение паркет файла и переименовываем столбц "subscription_channel" на "ch"
    events_subscription = (sql
        .read.parquet(events_path)
        .selectExpr("event.user as user", "event.subscription_channel as ch") 
        .where("user is not null and ch is not null")
        # группируем строки по столбцу "user". Для каждой группы собираем все значения "ch" в список и сохраняем столбец "chans"
        .groupBy("user").agg(F.collect_list(F.col("ch")).alias("chans"))
        .persist()
    )
    
    return events_subscription

In [8]:
# Test
events_subscriptions_df = events_subscriptions("/user/master/data/geo/events", spark)
events_subscriptions_df.show()



+------+--------------------+
|  user|               chans|
+------+--------------------+
|100010|[115991, 921975, ...|
|100140|[181194, 272628, ...|
|100227|[480141, 371174, ...|
|100263|[10793, 337169, 3...|
|100320|[420819, 72394, 1...|
|100553|[745258, 918447, ...|
|100704|[278782, 734173, ...|
|100735|[416809, 791826, ...|
|100768|[291826, 559591, ...|
| 10096|[322875, 777239, ...|
|100964|[77250, 995213, 1...|
|101021|[215678, 225805, ...|
|101122|[695155, 293747, ...|
|101205|[317690, 422436, ...|
|101261|[891635, 170338, ...|
|101272|[534280, 486852, ...|
|102113|[411153, 724092, ...|
|102521|[81758, 111882, 6...|
|102536|[141196, 136143, ...|
|102539|[310577, 827713, ...|
+------+--------------------+
only showing top 20 rows





In [9]:
def events_union_sender_receiver(events_path: str, sql) -> DataFrame:
    # производим чтение паркет файла выбираеи и переименовываем отобранные столбцы "message_from" на "sender", "message_to" на "reciever"
    sender_receiver_df = (sql
        .read.parquet(events_path)
        .selectExpr("event.message_from as sender", "event.message_to as reciever") 
        .where("sender is not null and reciever is not null")
    )
    # производим чтение паркет файла выбираеи и переименовываем отобранные столбцы "message_from" на "sender", "message_to" на "reciever"
    receiver_sender_df = (sql
        .read.parquet(events_path)
        .selectExpr("event.message_to as reciever", "event.message_from as sender") 
        .where("sender is not null and reciever is not null")
    )
    # проводим объединение "sender_receiver_df" и "receiver_sender_df" и удаляем дубликаты 
    events_union_sender_receiver_df = (sender_receiver_df
        .union(receiver_sender_df)
        .distinct()
    )
    # добавляем новый столбец "sender_reciever_existing", который содержит строку, объединяющую значения "sender" и "reciever"
    events_union_sender_receiver_df = (events_union_sender_receiver_df
        .withColumn("sender_reciever_existing", F.concat(events_union_sender_receiver_df.sender, F.lit("-"), events_union_sender_receiver_df.reciever))
        # удаляем не нужные столбцы
        .drop("sender", "reciever")
        .persist()
    )
    
    return events_union_sender_receiver_df

In [10]:
# Test
events_union_sender_receiver_df = events_union_sender_receiver("/user/master/data/geo/events", spark)
events_union_sender_receiver_df.show()



+------------------------+
|sender_reciever_existing|
+------------------------+
|           111641-149488|
|            162624-32874|
|            148130-37048|
|            63964-149488|
|             74053-94916|
|             49528-13098|
|              8988-83233|
|            82108-149488|
|             93872-81848|
|           145291-149488|
|            18561-149488|
|              74682-9550|
|            87276-149488|
|              3175-39696|
|           139087-107225|
|            43551-144277|
|           114517-149488|
|            46022-149488|
|             89683-71316|
|            136390-96491|
+------------------------+
only showing top 20 rows





In [11]:
# Исправление функции
def recommendations(events_transform_from: DataFrame, geo_transform: DataFrame, events_subscription: DataFrame, events_union_sender_receiver: DataFrame) -> DataFrame:
    result = (
        events_transform_from.alias("from1")
            .crossJoin(events_transform_from.alias("from2"))
            .withColumn("distance", F.lit(2) * F.lit(6371) * F.asin(
                F.sqrt(
                F.pow(F.sin((F.col('from1.lat_eff') - F.col('from2.lat_eff')) / F.lit(2)),2)
                + F.cos(F.col("from1.lat_eff"))*F.cos(F.col("from2.lat_eff")) *
                F.pow(F.sin((F.col('from1.lon_eff') - F.col('from2.lon_eff')) / F.lit(2)),2)
        )))
        .where("distance <= 1")
        .withColumn("middle_point_lat", (F.col('from1.lat_eff') + F.col('from2.lat_eff'))/F.lit(2))
        .withColumn("middle_point_lon", (F.col('from1.lon_eff') + F.col('from2.lon_eff'))/F.lit(2))
        .selectExpr("from1.message_id_from as user_left", "from2.message_id_from as user_right", "middle_point_lat", "middle_point_lon")
        .distinct()
        .persist()
    )
    
    result = (
        result
        .crossJoin(geo_transform)
        .withColumn("distance", F.lit(2) * F.lit(6371) * F.asin(
        F.sqrt(
            F.pow(F.sin((F.col('middle_point_lat') - F.col('lat_g'))/F.lit(2)),2)
            + F.cos(F.col("middle_point_lat"))*F.cos(F.col("lat_g"))*
            F.pow(F.sin((F.col('middle_point_lon') - F.col('lng_g'))/F.lit(2)),2)
        )))
        .select("user_left", "user_right", "id", "city", "distance")
        .persist()
    )
    
    window = Window().partitionBy("user_left", "user_right").orderBy(F.col('distance').asc())
    result = (
        result
            .withColumn("row_number", F.row_number().over(window))
            .filter(F.col('row_number') == 1)
            .drop('row_number', "distance", "id")
            .withColumn("timezone", F.concat(F.lit("Australia/"), F.col("city")))
            .withColumnRenamed("city", "zone_id")
            .withColumn('sender_reciever_all', F.concat(result.user_left, F.lit("-"), result.user_right))
            .persist()
    )
    
    result = result.join(events_union_sender_receiver, result.sender_reciever_all == events_union_sender_receiver.sender_reciever_existing, "leftanti")
    result = (
        result
            .join(events_subscription, result.user_left == events_subscription.user, "left")
            .withColumnRenamed('chans', 'chans_left')
            .drop('user')
            .join(events_subscription, result.user_right == events_subscription.user, "left")
            .withColumnRenamed('chans', 'chans_right')
            .drop('user')
            .withColumn('inter_chans', F.array_intersect(F.col('chans_left'), F.col('chans_right')))
            .filter(F.size(F.col("inter_chans")) > 1)
            .where("user_left <> user_right")
            .drop("inter_chans", "chans_left", "chans_right", "sender_reciever_all")
            .withColumn("processed_dttm", F.current_timestamp())
            .withColumn('local_time', 
                    F.when(F.col('zone_id')
                           .isin('Sydney', 'Melbourne', 'Brisbane', 'Perth', 'Adelaide', 'Canberra', 'Hobart', 'Darwin'), 
                               F.from_utc_timestamp(
                                   F.col('processed_dttm'), 
                                                F.concat(F.lit('Australia/'), 
                                                         F.col('zone_id')))).otherwise(None))
            .persist()
    )
    
    return result

In [None]:
# Test
recommendations_df = recommendations(events_transform_from_df, geo_transform_df, events_subscriptions_df, events_union_sender_receiver_df)
recommendations_df.show()



In [None]:
def main() -> None:
    #events_path = sys.argv[1]
    #geo_path = sys.argv[2]
    #output_path = sys.argv[3]
    events_path = "/user/master/data/geo/events/"
    geo_path = "/user/denis19/data/geo/cities/actual/geo.csv"
    output_path = "/user/denis19/analytics/showcases/"

    conf = (SparkConf()
        .setAppName("showcase_recommendations_to_friends")
        .set("spark.executor.memory", "4g")
        .set("spark.driver.memory", "4g"))
    sc = SparkContext(conf=conf)
    sql = SQLContext(sc)

    geo_transform_df = geo_transform(geo_path, sql)
    events_transform_from_df = events_transform_from(events_path, sql)
    #events_transform_to_df = events_transform_to(events_path, sql)
    events_subscriptions_df = events_subscriptions(events_path, sql)
    events_union_sender_receiver_df = events_union_sender_receiver(events_path, sql)
    recommendations_df = recommendations(events_transform_from_df, geo_transform_df, events_subscriptions_df, events_union_sender_receiver_df)
    write = recommendations_df.write.mode('overwrite').parquet(f'{output_path}')

    return write

In [None]:
if __name__ == "__main__":
        main()