In [4]:
from datetime import datetime, timedelta
import tzwhere

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window



spark = (
    SparkSession
    .builder
    .master("local[*]")
    .config("spark.driver.memory", "10g")
    .config("spark.driver.cores", 35)
    .appName("de-project-7")
    .getOrCreate()
)

22/10/14 16:19:45 WARN Utils: Your hostname, fhmmgutu3rdhsp9gl3m3 resolves to a loopback address: 127.0.1.1; using 172.16.0.30 instead (on interface eth0)
22/10/14 16:19:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/14 16:19:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
spark

In [5]:
#60
def input_paths(date: str, depth: int, namenode_url:str, input_path: str):
    return [
        f'{namenode_url}{input_path}/' \
            f"date={datetime.strftime((datetime.strptime(date, '%Y-%m-%d') - timedelta(days=day_amount)), '%Y-%m-%d')}"
        for day_amount in range(depth)
    ]

date = '2022-03-31'
depth = 60
namenode_url = 'hdfs://rc1a-dataproc-m-dg5lgqqm7jju58f9.mdb.yandexcloud.net:8020'
base_input_path = '/user/master/data/geo/events'

paths = input_paths(date, depth, namenode_url, base_input_path)

In [6]:
geo = spark.read.parquet(f'{namenode_url}/user/enpassan/dev/geo_data_correct')
messages = spark.read.parquet(*paths).withColumn("event_id", F.monotonically_increasing_id())

                                                                                

In [None]:
# #save corrected csv as parquet
# 
# geo = spark.read.options(header = "true", delimiter = ";").csv(f'{namenode_url}/user/enpassan/dev/geo_data')\
#     .withColumn("lat_c", F.regexp_replace("lat", ",", ".").cast("double"))\
#     .withColumn("lon_c", F.regexp_replace("lng", ",", ".").cast("double"))\
#     .withColumn("id", F.col("id").cast('int'))\
#     .drop("lat")\
#     .drop("lng")#to_parquet?

# geo.write.mode("overwrite").parquet(f'{namenode_url}/user/enpassan/dev/geo_data_correct')
geo.printSchema()

In [7]:
window = Window().partitionBy(messages.event_id).orderBy(F.asc(F.abs(F.asin(F.sqrt(F.sin((messages.lat - geo.lat_c) / 2) ** 2 \
+ F.cos(messages.lat) * F.cos(geo.lat_c) * \
(F.sin((messages.lon - geo.lon_c) / 2) ** 2))))))

In [None]:
window

In [None]:
mes_geo = messages.join(geo, how='cross').withColumn("rn", F.row_number().over(window)).where("rn == 1").with

In [None]:
mes_geo.write.mode("overwrite").parquet(f"{namenode_url}/user/enpassan/dev/mes_geo")

In [None]:
mes_geo.show()

In [9]:
mes_geo = spark.read.parquet(f"{namenode_url}/user/enpassan/dev/mes_geo")

In [None]:
mes_geo.printSchema()

In [10]:
user_geo = mes_geo\
    .where("event_type == 'message'")\
    .selectExpr("event.message_from as user_id", "event.datetime as dt", "id as city_id", "city as city_name", "lat_c", "lon_c")\
    .withColumn("dt", F.to_timestamp("dt","yyyy-MM-dd HH:mm:ss"))

In [None]:
user_geo.printSchema()

In [11]:
window_act_city = Window().partitionBy("user_id").orderBy(F.desc("dt"))

user_city = user_geo\
    .where("dt is not null")\
    .withColumn("rn", F.row_number().over(window_act_city))

In [12]:
from timezonefinder import TimezoneFinder
from pyspark.sql import types as T
from cachetools import cached

@cached(cache={})
def get_tf():
    return TimezoneFinder()

In [23]:
@F.udf(returnType = T.StringType())
def get_tz(lng, lat):
    tf = get_tf()
    tz_str = tf.timezone_at(lng=lng, lat=lat)
    return tz_str

In [None]:
get_tz(143, )

In [29]:
user_act_city = user_city\
    .where("rn == 1")\
    .withColumn("local_time", F.from_utc_timestamp("dt", get_tz(user_city.lon_c, user_city.lat_c)))\
    .select("user_id", F.expr("city_name as act_city"), "local_time")

In [31]:
user_home_city = user_city\
    .withColumn("days_stayed", (F.col("dt") - F.lag("dt", 1).over(window_act_city)).cast('long')/3600/24 * (-1))\
    .where("days_stayed > 27")\
    .groupBy("user_id", "city_id", F.expr("city_name as home_city"))\
    .agg(F.max("rn").alias("rn"))\
    .drop("rn", "city_id")\

# home_city - если одна запись, то lag вернет null, иначе количество в днях

In [32]:
#Как считать города без указанного времени отправки сообщения? - неясен порядок посещения городов, такая информация бесполезна

user_cities_visited = user_geo\
    .where("dt is not null")\
    .groupBy("user_id")\
    .agg(F.sort_array(F.collect_list(F.struct("dt", "city_name")), asc=False).alias("collected"))\
    .withColumn("travel_array", F.col("collected.city_name"))\
    .drop("collected")\
    .withColumn("travel_count", F.size("travel_array"))


#     .distinct()\

#     .withColumn("travel_count", F.)

In [33]:
user_city_info = user_act_city\
    .join(user_home_city, on = "user_id", how="left")\
    .join(user_cities_visited, on="user_id", how="left")
#Витрина 1 готова

In [34]:
user_city_info.show()



+-------+-----------+-------------------+---------+--------------------+------------+
|user_id|   act_city|         local_time|home_city|        travel_array|travel_count|
+-------+-----------+-------------------+---------+--------------------+------------+
|     65|  Newcastle|2022-02-01 02:13:26|     null|         [Newcastle]|           1|
|      7|  Melbourne|2022-03-29 13:22:13|     null|         [Melbourne]|           1|
|     84|Rockhampton|2022-02-09 05:08:14|     null|       [Rockhampton]|           1|
|     68|  Newcastle|2022-03-26 04:08:56|     null|         [Newcastle]|           1|
|     27| Cranbourne|2022-03-27 02:55:40|     null|        [Cranbourne]|           1|
|     17|   Canberra|2022-03-26 00:29:27|     null|          [Canberra]|           1|
|     41|    Bendigo|2022-02-06 04:44:00|     null|           [Bendigo]|           1|
|     85|     Darwin|2022-03-24 15:09:49|     null|            [Darwin]|           1|
|     61| Townsville|2022-03-10 15:44:58|     null|   

                                                                                

In [None]:
# mes_geo.printSchema()

# mes_geo.write.mode("overwrite").parquet(f"{namenode_url}/user/enpassan/dev/mes_geo")

# !hdfs dfs -ls /user/enpassan/dev/mes_geo

In [None]:
# !hdfs dfs -du -s -h /user/enpassan/dev/mes_geo

In [None]:
mes_geo = spark.read.parquet(f"{namenode_url}/user/enpassan/dev/mes_geo").withColumn("city_id", F.col("id")).drop("id")

In [None]:
mes_geo.groupBy("city_id").count().show()