In [1]:
import os, sys
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
 
import findspark
findspark.init()
findspark.find()
 
import pyspark.sql.functions as F
from pyspark.sql import SparkSession, DataFrame
from pyspark.context import SparkContext, SparkConf
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, BooleanType, IntegerType
from pyspark.storagelevel import StorageLevel

import pr7_classes as pr7

In [2]:
spark = pr7.get_session_notebook()

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/lib/spark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2023-04-25 08:49:09,074 WARN util.Utils: Your hostname, fhmev7mr8ojds7g6cgdj resolves to a loopback address: 127.0.1.1; using 172.16.0.23 instead (on interface eth0)
2023-04-25 08:49:09,075 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2023-04-25 08:49:31,839 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to request executors before the AM has registered!


In [3]:
pr7.print_conf(spark)
print("app_id".ljust(44), spark.sparkContext._jsc.sc().applicationId())

app_id                                       application_1674128476788_24982


In [None]:
def main(spark, dfname):
    if dfname == 'Cities':
        citiesRaw = pr7.CitiesRaw(spark)
        citiesRaw.calc(True)
        citiesRaw.desc()

        cities = pr7.Cities(spark, citiesRaw)
        cities.calc(True)
        cities.desc()
        return 
    
    if dfname == 'EventsWithCitiesAll':
        eventsSource = pr7.EventsSource(spark)
        eventsSource.read('2022-03-01')
        eventsSource.desc()

        eventsRaw = pr7.EventsRaw(spark, eventsSource)
        eventsRaw.calc(True)
        eventsRaw.desc()

        eventsWithUserAndCoords = pr7.EventsWithUserAndCoords(spark, eventsRaw) 
        eventsWithUserAndCoords.calc(True)
        eventsWithUserAndCoords.desc()

        cities = pr7.Cities(spark, None)
        cities.read()
        cities.desc()

        eventsWithCitiesPartial = pr7.EventsWithCitiesPartial(spark, eventsWithUserAndCoords, cities) 
        eventsWithCitiesPartial.calc(True)
        eventsWithCitiesPartial.desc()

        eventsWithCitiesAll = pr7.EventsWithCitiesAll(spark, eventsWithCitiesPartial) 
        eventsWithCitiesAll.calc(True)
        eventsWithCitiesAll.desc()
        return     

    if dfname == 'Report4':
        eventsWithUserAndCoords = pr7.EventsWithUserAndCoords(spark, None)
        eventsWithUserAndCoords.read()
        eventsWithUserAndCoords.desc()

        userChannelSubscriptions = pr7.UserChannelSubscriptions(spark, eventsWithUserAndCoords)
        userChannelSubscriptions.calc(True, 0.1)
        userChannelSubscriptions.desc()

        userCommonChannels = pr7.UserCommonChannels(spark, userChannelSubscriptions, userChannelSubscriptions)
        userCommonChannels.calc(True)
        userCommonChannels.desc()

        usersCorresponded = pr7.UsersCorresponded(spark, eventsWithUserAndCoords)
        usersCorresponded.calc(True)
        usersCorresponded.desc()

        eventsWithCitiesAll = pr7.EventsWithCitiesAll(spark, None)
        eventsWithCitiesAll.read()
        eventsWithCitiesAll.desc()

        usersLeft = pr7.Users(spark, eventsWithCitiesAll)
        usersLeft.read()

        usersRight = pr7.Users(spark, eventsWithCitiesAll)
        usersRight.read()

        usersNear = pr7.UsersNear(spark, usersLeft, usersRight)
        usersNear.calc(True)
        usersNear.desc()

        report4 = pr7.Report4(spark, userCommonChannels, usersCorresponded, usersNear)
        report4.calc(True)

In [4]:
eventsSource = pr7.EventsSource(spark)
eventsSource.read('2022-04-01')
eventsSource.desc()

2023-04-25 08:51:37,244 WARN datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.
[Stage 3:>                                                          (0 + 1) / 1]

Count: 9958321 Layer: -source Name: events
RDD Name: events
Partitons cnt: 932
root
 |-- event_type: string (nullable = true)
 |-- message_from: long (nullable = true)
 |-- message_to: long (nullable = true)
 |-- reaction_from: string (nullable = true)
 |-- user: string (nullable = true)
 |-- subscription_channel: long (nullable = true)
 |-- date: date (nullable = true)
 |-- lat: double (nullable = true)
 |-- lon: double (nullable = true)
 |-- message_id: long (nullable = true)
 |-- datetime: string (nullable = true)



                                                                                

In [None]:
eventsRaw = pr7.EventsRaw(spark, eventsSource)
eventsRaw.calc()
eventsRaw.desc()

eventsWithCitiesPartial = pr7.EventsWithCitiesPartial(spark, eventsWithUserAndCoords, cities) #events_full_coords
eventsWithCitiesAll = pr7.EventsWithCitiesAll(spark, eventsWithCitiesPartial)
registrationsWithCities = pr7.RegistrationsWithCities(spark, eventsWithCitiesAll)
eventsWithRegsWithCities = pr7.EventsWithRegsWithCities(spark, eventsWithCitiesAll, registrationsWithCities)

userTravelCities = pr7.UserTravelCities(spark, eventsWithCitiesAll)
userTravels = pr7.UserTravels(spark, userTravelCities)
report3 = pr7.Report3(spark, eventsWithRegsWithCities)
report2 = pr7.Report2(spark, users, userTravels)

2023-04-25 09:19:25,729 WARN execution.CacheManager: Asked to cache already cached data.
2023-04-25 09:19:26,564 WARN scheduler.TaskSetManager: Stage 4 contains a task of very large size (5876 KiB). The maximum recommended task size is 1000 KiB.
2023-04-25 09:28:04,529 WARN scheduler.TaskSetManager: Stage 5 contains a task of very large size (5876 KiB). The maximum recommended task size is 1000 KiB.
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_8_929 !
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_8_842 !
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_8_745 !
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_8_11 !
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndpoint: No more replicas available for rdd_8_50 !
2023-04-25 09:29:22,665 WARN storage.BlockManagerMasterEndp

In [None]:
# проверка
eventsWithRegsWithCities.df.groupBy('event_type').count().show()

In [None]:
eventsRaw = pr7.EventsRaw(spark, None)
eventsRaw.read('2022-08-01')
eventsRaw.desc()

In [None]:
eventsWithUserAndCoords = pr7.EventsWithUserAndCoords(spark, eventsRaw)
eventsWithUserAndCoords.calc(True)
eventsWithUserAndCoords.desc()

In [None]:
userChannelSubscriptions = pr7.UserChannelSubscriptions(spark, eventsWithUserAndCoords)
userChannelSubscriptions.read()
# сохранить для тестирования в userChannelSubscriptions только sample 10% 
userChannelSubscriptions.calc(True, 0.1)
userChannelSubscriptions.desc()

In [None]:
userChannelSubscriptions.df = userChannelSubscriptions.df.repartition(600)
userChannelSubscriptions.df = userChannelSubscriptions.df.cache()
userChannelSubscriptions.df.count()

In [None]:
userCommonChannels = pr7.UserCommonChannels(spark, userChannelSubscriptions, userChannelSubscriptions)
userCommonChannels = pr7.UserCommonChannels(spark, None, None)
userCommonChannels.read()
userCommonChannels.calc()
userCommonChannels.desc()

In [None]:
eventsWithUserAndCoords = pr7.EventsWithUserAndCoords(spark, None) 
eventsWithUserAndCoords.read()
eventsWithUserAndCoords.desc()

In [None]:
usersCorresponded = pr7.UsersCorresponded(spark, eventsWithUserAndCoords)
usersCorresponded = pr7.UsersCorresponded(spark, None)
usersCorresponded.df = usersCorresponded.eventsWithUserAndCoords.df.where(" event_type = 'message' and message_to is not null ")

usersCorresponded.df = usersCorresponded.df.withColumnRenamed('user_id', 'user_left')\
                         .withColumn('user_right', F.col('message_to').cast(StringType()))\
                         .select('user_left', 'user_right').distinct()

usersCorresponded.df = usersCorresponded.df.withColumn('temp_left', F.col('user_left'))
usersCorresponded.df = usersCorresponded.df.withColumn('user_left', F.when(F.col('user_left')>F.col('user_right'), F.col('user_right')).otherwise(F.col('user_left')))
usersCorresponded.df = usersCorresponded.df.withColumn('user_right', F.when(F.col('temp_left')>F.col('user_right'), F.col('temp_left')).otherwise(F.col('user_right')))
usersCorresponded.df = usersCorresponded.df.drop('temp_left')
usersCorresponded.df = usersCorresponded.df.select('user_left', 'user_right').distinct()
        
usersCorresponded.read()
usersCorresponded.calc()
usersCorresponded.desc()

In [None]:
usersLeft = pr7.Users(spark, None)
usersLeft.read()
usersLeft.desc()

In [None]:
usersRight = pr7.Users(spark, None)
usersRight.read()
usersRight.desc()

In [None]:
usersNear = pr7.UsersNear(spark, usersLeft, usersRight)
usersNear = pr7.UsersNear(spark, None, None)
usersNear.read()
usersNear.calc(True)
usersNear.desc()

In [None]:
usersNear.df.show(40)

In [None]:
report4 = pr7.Report4(spark, userCommonChannels, usersCorresponded, usersNear)
report4.calc(True)
report4.desc()
report4.df = usersNear.df.selectExpr('user_left', 'user_right', 'city_left', 'city_id_left as zone_id')
userCommonChannels.df = userCommonChannels.df.withColumnRenamed('user_left', 'ucc_user_left')
userCommonChannels.df = userCommonChannels.df.withColumnRenamed('user_right', 'ucc_user_right')

report4.df = report4.df.join(userCommonChannels.df, \
                    [report4.df.user_left == userCommonChannels.df.ucc_user_left, 
                     report4.df.user_right == userCommonChannels.df.ucc_user_right], 'inner')

report4.df = report4.df.join(usersCorresponded.df, \
                    [report4.df.user_left == usersCorresponded.df.user_left, 
                     report4.df.user_right == usersCorresponded.df.user_right], 'leftanti')
    
report4.df = report4.df.drop('city_left', 'subscription_channel', 'ucc_user_left', 'ucc_user_right')
report4.df.printSchema()

report4.df = report4.df.cache() ##!! cache
                
pr7.Saver.save(report4.df, report4.layer, report4.path)

In [None]:
report4.df = usersNear.df.select('user_left', 'user_right', 'city_left', 'city_id_left')
report4.df = report4.df.join(usersCorresponded.df, \
                    [report4.df.user_left == usersCorresponded.df.uc_user_left, 
                     report4.df.user_right == usersCorresponded.df.uc_user_right], 'left')
report4.df = report4.df.withColumn('processed_dttm', F.current_timestamp())
report4.df = report4.df.withColumn('local_time', \
                    F.when( (F.col('city_left') == 'Sydney') | (F.col('city_left') == 'Melbourne') | \
                            (F.col('city_left') == 'Brisbane') | (F.col('city_left') == 'Perth') | \
                            (F.col('city_left') == 'Adelaide') | (F.col('city_left') == 'Canberra') | \
                            (F.col('city_left') == 'Hobart') | (F.col('city_left') == 'Darwin')  \
                    , F.from_utc_timestamp(F.col('processed_dttm'), F.concat(F.lit('Australia/'), F.col('city_left'))))\
                     .otherwise(None))

report4.df = report4.df.where("uc_user_left is null")
report4.df = report4.df.drop('uc_user_left', 'uc_user_right', 'city_left')
report4.df = report4.df.withColumnRenamed('city_id_left', 'zone_id')

report4.df = report4.df.cache()

count= report4.df.count()

report4.read()
report4.df.show(200)