# Описание проекта

Коллеги из другого проекта по просьбе нашей команды начали вычислять координаты событий (сообщений, подписок, реакций, регистраций), которые совершили пользователи соцсети. Значения координат будут появляться в таблице событий. Пока определяется геопозиция только исходящих сообщений, но уже сейчас можно начать разрабатывать новый функционал. 
В продукт планируют внедрить систему рекомендации друзей. Приложение будет предлагать пользователю написать человеку, если пользователь и адресат:
* состоят в одном канале,
* раньше никогда не переписывались,
* находятся не дальше 1 км друг от друга.
При этом команда хочет лучше изучить аудиторию соцсети, чтобы в будущем запустить монетизацию. Для этого было решено провести геоаналитику:
* Выяснить, где находится большинство пользователей по количеству сообщений, лайков и подписок из одной точки.
* Посмотреть, в какой точке Австралии регистрируется больше всего новых пользователей.
* Определить, как часто пользователи путешествуют и какие города выбирают.
Благодаря такой аналитике в соцсеть можно будет вставить рекламу: приложение сможет учитывать местонахождение пользователя и предлагать тому подходящие услуги компаний-партнёров. 

## Создать витрину в разрезе пользователей

Определите, в каком городе было совершено событие. С этим нам поможет список городов из файла geo.csv. В нём указаны координаты центра города.

Найдите расстояние от координаты отправленного сообщения до центра города. Событие относится к тому городу, расстояние до которого наименьшее.

Витрина должна содержать следующие атрибуты:

* user_id — идентификатор пользователя.
* act_city — актуальный адрес. Это город, из которого было отправлено последнее сообщение.
* home_city — домашний адрес. Это последний город, в котором пользователь был дольше 27 дней.
* travel_count — количество посещённых городов. Если пользователь побывал в каком-то городе повторно, то это считается за отдельное посещение.
* travel_array — список городов в порядке посещения (не понял как реализовать).
* TIME_UTC — время в таблице событий. Указано в UTC+0.
* timezone — актуальный адрес. Атрибуты содержатся в виде Australia/Sydney.

Создадим подключение:

In [None]:
import os
os.environ['HADOOP_CONF_DIR'] = '/etc/hadoop/conf'
os.environ['YARN_CONF_DIR'] = '/etc/hadoop/conf'
 
import findspark
findspark.init()
findspark.find()

import datetime
import pyspark.sql.functions as F
from pyspark.sql.window import Window

import pyspark
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
spark = SparkSession \
    .builder \
    .master("yarn") \
        .config("spark.driver.cores", "2") \
        .config("spark.driver.memory", "4g") \
        .appName("test1337") \
        .getOrCreate()

Откроем файл geo.
Я не знал, как в PySpark запятую в координатах заменить точкой, чтобы присвоить корректный тип данных и пересохранил файл с точкой через pandas и назвал его geo_2:

In [27]:
import pandas as pd
df=pd.read_csv("geo.csv", sep=';', decimal=',')
df

Unnamed: 0,id,city,lat,lng
0,1,Sydney,-33.865,151.2094
1,2,Melbourne,-37.8136,144.9631
2,3,Brisbane,-27.4678,153.0281
3,4,Perth,-31.9522,115.8589
4,5,Adelaide,-34.9289,138.6011
5,6,Gold Coast,-28.0167,153.4
6,7,Cranbourne,-38.0996,145.2834
7,8,Canberra,-35.2931,149.1269
8,9,Newcastle,-32.9167,151.75
9,10,Wollongong,-34.4331,150.8831


In [None]:
df.to_csv('geo_2.csv', index=False)

In [28]:
geo_2 = spark.read.csv("/user/antonnnbis/tmp/geo_2.csv", sep=",", inferSchema=True, header=True)
geo_2.show(n = 30, truncate = False)
geo_2.printSchema()

[Stage 227:>                                                        (0 + 1) / 1]

+---+-----------+--------+--------+
|id |city       |lat     |lng     |
+---+-----------+--------+--------+
|1  |Sydney     |-33.865 |151.2094|
|2  |Melbourne  |-37.8136|144.9631|
|3  |Brisbane   |-27.4678|153.0281|
|4  |Perth      |-31.9522|115.8589|
|5  |Adelaide   |-34.9289|138.6011|
|6  |Gold Coast |-28.0167|153.4   |
|7  |Cranbourne |-38.0996|145.2834|
|8  |Canberra   |-35.2931|149.1269|
|9  |Newcastle  |-32.9167|151.75  |
|10 |Wollongong |-34.4331|150.8831|
|11 |Geelong    |-38.15  |144.35  |
|12 |Hobart     |-42.8806|147.325 |
|13 |Townsville |-19.2564|146.8183|
|14 |Ipswich    |-27.6167|152.7667|
|15 |Cairns     |-16.9303|145.7703|
|16 |Toowoomba  |-27.5667|151.95  |
|17 |Darwin     |-12.4381|130.8411|
|18 |Ballarat   |-37.55  |143.85  |
|19 |Bendigo    |-36.75  |144.2667|
|20 |Launceston |-41.4419|147.145 |
|21 |Mackay     |-21.1411|149.1861|
|22 |Rockhampton|-23.375 |150.5117|
|23 |Maitland   |-32.7167|151.55  |
|24 |Bunbury    |-33.3333|115.6333|
+---+-----------+--------+--

                                                                                

Создал справочник для заполнения атрибута timezone:

In [4]:
data_geo = [('Australia/Sydney', 1),
        ('Australia/Melbourne', 2),
        ('Australia/Brisbane', 3),
        ('Australia/Perth', 4),
        ('Australia/Adelaide', 5),
        ('Australia/Brisbane', 6),
        ('Australia/Melbourne', 7),
        ('Australia/Sydney', 8),
        ('Australia/Sydney', 9),
        ('Australia/Sydney', 10),
        ('Australia/Melbourne', 11),
        ('Australia/Hobart', 12),
        ('Australia/Brisbane', 13),
        ('Australia/Brisbane', 14),
        ('Australia/Brisbane', 15),
        ('Australia/Brisbane', 16),
        ('Australia/Darwin', 17),
        ('Australia/Melbourne', 18),
        ('Australia/Melbourne', 19),
        ('Australia/Hobart', 20),
        ('Australia/Brisbane', 21),
        ('Australia/Brisbane', 22),
        ('Australia/Sydney', 23),
        ('Australia/Perth', 24)
]

columns_geo = ['timezone', 'city_id']
df_timezone = spark.createDataFrame(data=data_geo, schema=columns_geo)
df_timezone.show(30)

+-------------------+-------+
|           timezone|city_id|
+-------------------+-------+
|   Australia/Sydney|      1|
|Australia/Melbourne|      2|
| Australia/Brisbane|      3|
|    Australia/Perth|      4|
| Australia/Adelaide|      5|
| Australia/Brisbane|      6|
|Australia/Melbourne|      7|
|   Australia/Sydney|      8|
|   Australia/Sydney|      9|
|   Australia/Sydney|     10|
|Australia/Melbourne|     11|
|   Australia/Hobart|     12|
| Australia/Brisbane|     13|
| Australia/Brisbane|     14|
| Australia/Brisbane|     15|
| Australia/Brisbane|     16|
|   Australia/Darwin|     17|
|Australia/Melbourne|     18|
|Australia/Melbourne|     19|
|   Australia/Hobart|     20|
| Australia/Brisbane|     21|
| Australia/Brisbane|     22|
|   Australia/Sydney|     23|
|    Australia/Perth|     24|
+-------------------+-------+



Соединил созданный справочник с таблицей geo:

In [29]:
geo = geo_2.join(df_timezone, geo_2.id == df_timezone.city_id, how='inner').drop(df_timezone.city_id)
geo.show(30)

2024-01-14 18:31:41,432 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 230.0 (TID 21365, rc1a-dataproc-d-5w4oxa2s5b8foehs.mdb.yandexcloud.net, executor 5): java.io.EOFException
	at java.io.DataInputStream.readInt(DataInputStream.java:392)
	at org.apache.spark.api.python.PythonWorkerFactory.createSocket$1(PythonWorkerFactory.scala:120)
	at org.apache.spark.api.python.PythonWorkerFactory.liftedTree1$1(PythonWorkerFactory.scala:136)
	at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:135)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:105)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:131)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
	at org.apache.spark.rdd.

+---+-----------+--------+--------+-------------------+
| id|       city|     lat|     lng|           timezone|
+---+-----------+--------+--------+-------------------+
|  1|     Sydney| -33.865|151.2094|   Australia/Sydney|
|  2|  Melbourne|-37.8136|144.9631|Australia/Melbourne|
|  3|   Brisbane|-27.4678|153.0281| Australia/Brisbane|
|  4|      Perth|-31.9522|115.8589|    Australia/Perth|
|  5|   Adelaide|-34.9289|138.6011| Australia/Adelaide|
|  6| Gold Coast|-28.0167|   153.4| Australia/Brisbane|
|  7| Cranbourne|-38.0996|145.2834|Australia/Melbourne|
|  8|   Canberra|-35.2931|149.1269|   Australia/Sydney|
|  9|  Newcastle|-32.9167|  151.75|   Australia/Sydney|
| 10| Wollongong|-34.4331|150.8831|   Australia/Sydney|
| 11|    Geelong|  -38.15|  144.35|Australia/Melbourne|
| 12|     Hobart|-42.8806| 147.325|   Australia/Hobart|
| 13| Townsville|-19.2564|146.8183| Australia/Brisbane|
| 14|    Ipswich|-27.6167|152.7667| Australia/Brisbane|
| 15|     Cairns|-16.9303|145.7703| Australia/Br

2024-01-14 18:31:41,683 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 231.0 (TID 21368, rc1a-dataproc-d-5w4oxa2s5b8foehs.mdb.yandexcloud.net, executor 5): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:536)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:525)
	at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:38)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:643)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:621)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:456)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
	at scala.collection.Iterator$$anon$10.hasNext

Ниже код для прочтения всех исторических данных, но у меня это все обрабатывалось долго и неудобно было с этим работать - я ограничил данные 10-ю днями, когда все заработает, можно будет повторить процесс на всех данных:

In [35]:
#df_messages = spark.read\
#    .parquet("/user/master/data/geo/events")\
#    .where("event_type='message'")\
#    .select(F.col("event.message_from").alias("user"),
#                F.col("event.message_id").alias("message"),
#                F.col("event_type").alias("event_type"),
#                F.col("lat").alias("lat"),
#                F.col("lon").alias("lon"),
#                F.col("date").alias("date"),
#        )

2024-01-05 14:02:09,335 WARN datasources.SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


Функция для прочтения данных по дням:

In [6]:
def input_paths(date, depth):
    dt = datetime.datetime.strptime(date, '%Y-%m-%d')
    return [f"/user/master/data/geo/events/date={(dt-datetime.timedelta(days=x)).strftime('%Y-%m-%d')}" for x in range(depth)]

Читаем данные:

In [7]:
date = '2022-03-05'
depth = 10

act_period = input_paths(date, depth)
messages = spark.read\
    .option("basePath", "/user/master/data/geo/events")\
    .parquet(*act_period)\
    .where("event_type='message'")

df_messages = messages\
    .where("event_type='message'")\
    .select(F.col("event.message_from").alias("user"),
                F.col("event.message_id").alias("message"),
                F.col("event_type").alias("event_type"),
                F.col("lat").alias("lat"),
                F.col("lon").alias("lon"),
                F.col("date").alias("date"),
                F.col("event.datetime").alias("TIME_UTC")
        )

                                                                                

Объединяем данные путем присвоения всех возможных координат 24-х городов к каждому сообщению:

In [9]:
def get_geo_id(geo_id_max, df):
    results = df.withColumn('geo_id', F.lit(1))
    for i in range(geo_id_max - 1):
        df_geo = df.withColumn('geo_id', F.lit(i+2))
        results = df_geo.union(results)
    return results.withColumnRenamed("lat", "lat_1")

df_geo_id = get_geo_id(24, df_messages)

1. Высчитываем расстояние, и оставляем только строки с наименьшим расстоянием rank_dist = 1, что соответствует самуму близкому центру города.
2. Группируем пользователей по дате и оставляем только строки с последней датой rank_act_city = 1, что должно соответствовать самому последнему городу, откуда было отправлено сообщение и это считаем актуальным городом.
3. Создаем группы городов в зависимости от последовательности прибывания в них и город, в котором пользователь находился 27 и более дней подряд будет считаться городом, где пользователь живет.
4. Ранжируем группы дат, соответствующие перемещению пользователя из одного города в другой и максимальное значение ранжирования должно соответствовать количеству перемещений travel_count.

In [10]:
joined_geo = df_geo_id.join(geo, df_geo_id.geo_id == geo.id, how='inner').drop(geo.id)\
    .withColumnRenamed("lat", "lat_city")\
    .withColumnRenamed("lng", "lon_city")\
    .withColumnRenamed("lat_1", "lat")

distance_df = joined_geo.withColumn('distance',
                    12742 * F.asin(
                        F.sqrt(F.pow(F.sin((F.radians(F.col('lat')) - F.radians(F.col('lat_city')))/F.lit(2)), 2)\
                    + F.cos(F.radians(F.col('lat_city'))) * F.cos(F.radians(F.col('lat')))\
                    * F.pow(F.sin((F.radians(F.col('lon')) - F.radians(F.col('lon_city')))/F.lit(2)), 2)))
                               )

window_dist = Window().partitionBy('message').orderBy('distance')

df_window_dist = distance_df.withColumn("rank_dist", F.row_number().over(window_dist))

df_act_city_msg = df_window_dist.select('user', 'message', 'date', 'city', 'TIME_UTC', 'timezone').where("rank_dist = 1")

window_date = Window().partitionBy('user').orderBy(F.asc('date'))
window_act_city = Window().partitionBy('user').orderBy(F.desc('date'))

df_window_date = df_act_city_msg.withColumn("rank_date", F.dense_rank().over(window_date))
df_window_act_city = df_act_city_msg.withColumn("rank_act_city", F.row_number().over(window_act_city))

df_act_city_date = df_window_act_city.select(F.col("user").alias("user"),
                                         F.col("city").alias("act_city"),
                                            'TIME_UTC', 'timezone')\
                                        .distinct()\
                                        .where('rank_act_city = 1')

df_grp = df_window_date.select('user', 'city', 'date', 'rank_date').distinct()\
                        .withColumn("grp_date", F.col('date') - F.col('rank_date'))


dates_users_sities = df_grp.groupBy(F.col('user'), F.col('city'), F.col('grp_date')).count()

home_city_df = dates_users_sities.select('user', F.col("city").alias("home_city"))\
                .where('count > 26')

view_1 = df_act_city_date.join(home_city_df, df_act_city_date.user == home_city_df.user, how='left')\
        .drop(home_city_df.user)

window_travel_count = Window().partitionBy('user').orderBy('grp_date')
df_window_travel_count = dates_users_sities.withColumn("travel", F.row_number().over(window_travel_count))
df_travel_count = df_window_travel_count.withColumn("travel_count", F.max("travel").over(Window.partitionBy("user")))
df_travel_count = df_travel_count.select('user', 'travel_count').distinct()

view_1_1 = view_1.join(df_travel_count, view_1.user == df_travel_count.user, how='left').drop(df_travel_count.user)

In [11]:
view_1_1.show()

[Stage 22:>                                                         (0 + 1) / 1]

+-----+-----------+-------------------+-------------------+---------+------------+
| user|   act_city|           TIME_UTC|           timezone|home_city|travel_count|
+-----+-----------+-------------------+-------------------+---------+------------+
| 2509| Cranbourne|               null|Australia/Melbourne|     null|           2|
| 2529| Cranbourne|               null|Australia/Melbourne|     null|           1|
| 5409|      Perth|               null|    Australia/Perth|     null|           3|
| 9715|     Darwin|               null|   Australia/Darwin|     null|           1|
|13460|  Newcastle|2022-03-04 03:30:02|   Australia/Sydney|     null|           1|
|15322|   Maitland|               null|   Australia/Sydney|     null|           5|
|15663|     Sydney|               null|   Australia/Sydney|     null|           3|
|17979|   Brisbane|               null| Australia/Brisbane|     null|           1|
|18628|   Brisbane|               null| Australia/Brisbane|     null|           1|
|274

                                                                                

## Создать витрину в разрезе зон

Нужно создать геослой — найти распределение атрибутов, связанных с событиями, по географическим зонам (городам). Если проанализировать этот слой, то можно понять поведение пользователей по различным регионам. 
Итак, нам нужно посчитать количество событий в конкретном городе за неделю и месяц. Значит, витрина будет содержать следующие поля:
* month — месяц расчёта;
* week — неделя расчёта;
* zone_id — идентификатор зоны (города) (не понял как реализовать по всем событиям, ведь координаты есть только у сообщений);
* week_message — количество сообщений за неделю;
* week_reaction — количество реакций за неделю;
* week_subscription — количество подписок за неделю;
* week_user — количество регистраций за неделю;
* month_message — количество сообщений за месяц;
* month_reaction — количество реакций за месяц;
* month_subscription — количество подписок за месяц;
* month_user — количество регистраций за месяц.

В этой витрине мы учитываем не только отправленные сообщения, но и другие действия — подписки, реакции, регистрации (рассчитываются по первым событиям). Пока присвойте таким событиям координаты последнего отправленного сообщения конкретного пользователя (не понятен смысл - если данных нет, то и не стоит такую процедуру проводить, ведь это некорректно).

**Прочитаем данные с реакциями:**

In [12]:
date = '2022-03-05'
depth = 10

act_period = input_paths(date, depth)
reactions = spark.read\
    .option("basePath", "/user/master/data/geo/events")\
    .parquet(*act_period)\
    .where("event_type='reaction'")


reactions.show()
reactions.printSchema()

+--------------------+----------+-------------------+------------------+----------+
|               event|event_type|                lat|               lon|      date|
+--------------------+----------+-------------------+------------------+----------+
|[,, 2022-03-05 07...|  reaction| -34.40351927336887|149.73577109555941|2022-03-05|
|[,, 2022-03-05 02...|  reaction|-34.337548200603415|151.82208788269017|2022-03-05|
|[,, 2022-03-05 06...|  reaction| -32.42359274527981| 151.8693407546899|2022-03-05|
|[,, 2022-03-05 21...|  reaction| -40.58692145613617|148.00070631827495|2022-03-05|
|[,, 2022-03-05 14...|  reaction| -26.91385555002475| 152.7788734092017|2022-03-05|
|[,, 2022-03-05 03...|  reaction| -36.88399210892872| 143.9241145053058|2022-03-05|
|[,, 2022-03-05 14...|  reaction|-41.067136060691745|147.64318158681752|2022-03-05|
|[,, 2022-03-05 09...|  reaction| -40.83507535253652|147.30911595711027|2022-03-05|
|[,, 2022-03-05 10...|  reaction|  -34.3785561468774|149.95163586950872|2022

Добавим недели и месяцы:

In [13]:
df_reactions = reactions\
    .where("event_type='reaction'")\
    .select(F.col("event.reaction_from").alias("user"),
                F.col("event.reaction_type").alias("reaction"),
                F.col("event_type").alias("event_type"),
                F.col("date").alias("date"))\
    .withColumn("month", F.month("date"))\
    .withColumn("week", F.weekofyear("date"))

df_reactions.show()

+------+--------+----------+----------+-----+----+
|  user|reaction|event_type|      date|month|week|
+------+--------+----------+----------+-----+----+
|161701|    like|  reaction|2022-03-05|    3|   9|
| 22300|    like|  reaction|2022-03-05|    3|   9|
| 74850|    like|  reaction|2022-03-05|    3|   9|
|112795|    like|  reaction|2022-03-05|    3|   9|
|149167|    like|  reaction|2022-03-05|    3|   9|
| 93858| dislike|  reaction|2022-03-05|    3|   9|
|123349|    like|  reaction|2022-03-05|    3|   9|
| 65468|    like|  reaction|2022-03-05|    3|   9|
|137735|    like|  reaction|2022-03-05|    3|   9|
| 16417|    like|  reaction|2022-03-05|    3|   9|
|168213| dislike|  reaction|2022-03-05|    3|   9|
|  2361|    like|  reaction|2022-03-05|    3|   9|
|134467|    like|  reaction|2022-03-05|    3|   9|
| 96196|    like|  reaction|2022-03-05|    3|   9|
|105420|    like|  reaction|2022-03-05|    3|   9|
| 60311|    like|  reaction|2022-03-05|    3|   9|
|149545| dislike|  reaction|202

Посчитаем количество реакций по каждому месяцу и по каждой неделе:

In [14]:
df_reactions_months = df_reactions.groupBy(F.col('month')).count().withColumnRenamed('count', 'month_reaction')

df_reactions_weeks = df_reactions.groupBy(F.col('week')).count().withColumnRenamed('count', 'week_reaction')

df_reactions_count = df_reactions.join(df_reactions_months, df_reactions.month == df_reactions_months.month, how='left')\
                    .join(df_reactions_weeks, df_reactions.week == df_reactions_weeks.week, how='left')\
                    .drop(df_reactions_months.month).drop(df_reactions_weeks.week)\
                    .select('month', 'week', 'week_reaction', 'month_reaction').distinct()

df_reactions_count.show()

                                                                                

+-----+----+-------------+--------------+
|month|week|week_reaction|month_reaction|
+-----+----+-------------+--------------+
|    3|   9|       294376|        248424|
|    2|   9|       294376|        216501|
|    2|   8|       170549|        216501|
+-----+----+-------------+--------------+



**Прочитаем данные с подписками:**

In [15]:
date = '2022-03-05'
depth = 10

act_period = input_paths(date, depth)
subscriptions = spark.read\
    .option("basePath", "/user/master/data/geo/events")\
    .parquet(*act_period)\
    .where("event_type='subscription'")


subscriptions.show()
subscriptions.printSchema()

+--------------------+------------+-------------------+------------------+----------+
|               event|  event_type|                lat|               lon|      date|
+--------------------+------------+-------------------+------------------+----------+
|[,, 2022-03-05 00...|subscription| -33.45530692004778|151.31543948420796|2022-03-05|
|[,, 2022-03-05 22...|subscription|-40.862888495361425| 147.6596654521476|2022-03-05|
|[,, 2022-03-05 22...|subscription|-33.714449927818805|151.28239873476585|2022-03-05|
|[,, 2022-03-05 16...|subscription| -26.64637836118689|153.40176733828957|2022-03-05|
|[,, 2022-03-05 13...|subscription| -37.53900430225147| 145.8137118521069|2022-03-05|
|[,, 2022-03-05 03...|subscription| -34.09196608676216| 151.4613360983361|2022-03-05|
|[,, 2022-03-05 07...|subscription| -31.29565679668916|116.80059062289207|2022-03-05|
|[,, 2022-03-05 04...|subscription|-18.805026525396517|147.02422192497784|2022-03-05|
|[,, 2022-03-05 21...|subscription| -34.18069062699671

Добавим недели и месяцы:

In [16]:
df_subscriptions = subscriptions\
    .where("event_type='subscription'")\
    .select(F.col("event.user").alias("user"),
                F.col("event.subscription_channel").alias("channel"),
                F.col("event_type").alias("event_type"),
                F.col("date").alias("date"))\
    .withColumn("month", F.month("date"))\
    .withColumn("week", F.weekofyear("date"))

df_subscriptions.show()

+------+-------+------------+----------+-----+----+
|  user|channel|  event_type|      date|month|week|
+------+-------+------------+----------+-----+----+
|121730| 604773|subscription|2022-03-05|    3|   9|
|121820| 652956|subscription|2022-03-05|    3|   9|
| 12245| 818823|subscription|2022-03-05|    3|   9|
|126012| 934801|subscription|2022-03-05|    3|   9|
|126737|  47149|subscription|2022-03-05|    3|   9|
|127358| 614046|subscription|2022-03-05|    3|   9|
|129400| 324833|subscription|2022-03-05|    3|   9|
|129478| 528251|subscription|2022-03-05|    3|   9|
|135532| 245006|subscription|2022-03-05|    3|   9|
| 13645| 203410|subscription|2022-03-05|    3|   9|
| 14190| 952262|subscription|2022-03-05|    3|   9|
|143626| 275105|subscription|2022-03-05|    3|   9|
|143915| 456879|subscription|2022-03-05|    3|   9|
|145020| 153776|subscription|2022-03-05|    3|   9|
|148207| 278093|subscription|2022-03-05|    3|   9|
|151378| 738179|subscription|2022-03-05|    3|   9|
|151385| 687

Посчитаем количество подписок по каждому месяцу и по каждой неделе:

In [17]:
df_subscriptions_months = df_subscriptions.groupBy(F.col('month')).count().withColumnRenamed('count', 'month_subscription')

df_subscriptions_weeks = df_subscriptions.groupBy(F.col('week')).count().withColumnRenamed('count', 'week_subscription')

df_subscriptions_count = df_subscriptions\
                    .join(df_subscriptions_months, df_subscriptions.month == df_subscriptions_months.month, how='left')\
                    .join(df_subscriptions_weeks, df_subscriptions.week == df_subscriptions_weeks.week, how='left')\
                    .drop(df_subscriptions_months.month).drop(df_subscriptions_weeks.week)\
                    .select('month', 'week', 'week_subscription', 'month_subscription').distinct()

df_subscriptions_count.show()

                                                                                

+-----+----+-----------------+------------------+
|month|week|week_subscription|month_subscription|
+-----+----+-----------------+------------------+
|    2|   9|           559262|            413952|
|    3|   9|           559262|            471341|
|    2|   8|           326031|            413952|
+-----+----+-----------------+------------------+



**Прочитаем данные с сообщениями:**

In [18]:
date = '2022-03-05'
depth = 10

act_period = input_paths(date, depth)
messages = spark.read\
    .option("basePath", "/user/master/data/geo/events")\
    .parquet(*act_period)\
    .where("event_type='message'")

messages.show()
messages.printSchema()

[Stage 58:>                                                         (0 + 1) / 1]

+--------------------+----------+-------------------+------------------+----------+
|               event|event_type|                lat|               lon|      date|
+--------------------+----------+-------------------+------------------+----------+
|[,,,, i dont know...|   message| -33.70134197376076|152.15214334618787|2022-03-05|
|[[43081], 989107,...|   message|-32.817112404689226|152.04366515140228|2022-03-05|
|[,,,, is this the...|   message|-16.799098293427118|146.27308763602898|2022-03-05|
|[[34265], 69433, ...|   message| -32.81332035813615|115.89214218896626|2022-03-04|
|[,,,, please,, 46...|   message| -32.21426067817561|152.17841332537947|2022-03-04|
|[[123339], 657862...|   message| -36.85292468522662|  145.705396090251|2022-03-04|
|[,,,, where can I...|   message|-41.083056615424475|147.44592025888363|2022-03-04|
|[,,,, anybody kno...|   message|-33.684325210832576|151.20298931356814|2022-03-03|
|[,,,, i have just...|   message|  -40.5104640690403|147.59769275817007|2022

                                                                                

Добавим недели и месяцы:

In [19]:
df_messages = messages\
    .where("event_type='message'")\
    .select(F.col("event.message_from").alias("user"),
                F.col("event.message_id").alias("message"),
                F.col("event_type").alias("event_type"),
                F.col("date").alias("date"))\
    .withColumn("month", F.month("date"))\
    .withColumn("week", F.weekofyear("date"))

df_messages.show()

+------+-------+----------+----------+-----+----+
|  user|message|event_type|      date|month|week|
+------+-------+----------+----------+-----+----+
|  7240| 672513|   message|2022-03-05|    3|   9|
| 43081|1117752|   message|2022-03-05|    3|   9|
| 58776| 227124|   message|2022-03-05|    3|   9|
| 34265|1091227|   message|2022-03-04|    3|   9|
| 46129| 803362|   message|2022-03-04|    3|   9|
|123339|1114280|   message|2022-03-04|    3|   9|
|135981| 168314|   message|2022-03-04|    3|   9|
| 59630|  50082|   message|2022-03-03|    3|   9|
| 61619| 699867|   message|2022-03-03|    3|   9|
|141584| 266714|   message|2022-03-03|    3|   9|
| 56006|1105146|   message|2022-03-02|    3|   9|
|149967| 974437|   message|2022-03-02|    3|   9|
|165987|1127525|   message|2022-03-02|    3|   9|
| 31760| 191443|   message|2022-03-01|    3|   9|
| 31760| 483238|   message|2022-03-01|    3|   9|
| 46832|1010661|   message|2022-03-01|    3|   9|
| 62385| 199710|   message|2022-03-01|    3|   9|


Посчитаем количество сообщений по каждому месяцу и по каждой неделе:

In [20]:
df_messages_months = df_messages.groupBy(F.col('month')).count().withColumnRenamed('count', 'month_message')

df_messages_weeks = df_messages.groupBy(F.col('week')).count().withColumnRenamed('count', 'week_message')

df_messages_count = df_messages\
                    .join(df_messages_months, df_messages.month == df_messages_months.month, how='left')\
                    .join(df_messages_weeks, df_messages.week == df_messages_weeks.week, how='left')\
                    .drop(df_messages_months.month).drop(df_messages_weeks.week)\
                    .select('month', 'week', 'week_message', 'month_message').distinct()

df_messages_count.show()

                                                                                

+-----+----+------------+-------------+
|month|week|week_message|month_message|
+-----+----+------------+-------------+
|    2|   9|       33905|        28057|
|    2|   8|       22442|        28057|
|    3|   9|       33905|        28290|
+-----+----+------------+-------------+



1. Объединим данные по сообщениям, реакциям и подпискам, чтобы найти самое раннее событие для пользователя - это будет считаться регистрацией.
2. Посчитаем количество регистраций по месяцам и по неделям.

In [21]:
df_users = df_reactions.select('user', 'event_type', 'date', 'month', 'week')\
            .union(df_subscriptions.select('user', 'event_type', 'date', 'month', 'week'))\
            .union(df_messages.select('user', 'event_type', 'date', 'month', 'week')).distinct()

window_users = Window().partitionBy('user').orderBy('date')

df_window_users_reg = df_users.withColumn("rank_date", F.row_number().over(window_users))

df_users_reg = df_window_users_reg.select('user', 'month', 'week').where("rank_date = 1")

df_users_months = df_users_reg.groupBy(F.col('month')).count().withColumnRenamed('count', 'month_user')

df_users_weeks = df_users_reg.groupBy(F.col('week')).count().withColumnRenamed('count', 'week_user')

df_users_count = df_users_reg\
                    .join(df_users_months, df_users_reg.month == df_users_months.month, how='left')\
                    .join(df_users_weeks, df_users_reg.week == df_users_weeks.week, how='left')\
                    .drop(df_users_months.month).drop(df_users_weeks.week)\
                    .select('month', 'week', 'week_user', 'month_user').distinct()

df_users_count.show()

                                                                                

+-----+----+---------+----------+
|month|week|week_user|month_user|
+-----+----+---------+----------+
|    2|   9|    14024|    160950|
|    3|   9|    14024|      8100|
|    2|   8|   155026|    160950|
+-----+----+---------+----------+



Объединим все полученные таблицы по количеству событий и получим необходимую витрину:

In [22]:
cond_messages = [df_users_count.month == df_messages_count.month, df_users_count.week == df_messages_count.week]
cond_subscriptions = [df_users_count.month == df_subscriptions_count.month, df_users_count.week == df_subscriptions_count.week]
cond_reactions = [df_users_count.month == df_reactions_count.month, df_users_count.week == df_reactions_count.week]

In [23]:
view_2 = df_users_count\
        .join(df_messages_count, cond_messages)\
        .join(df_subscriptions_count, cond_subscriptions)\
        .join(df_reactions_count, cond_reactions)\
        .drop(df_messages_count.month).drop(df_messages_count.week)\
        .drop(df_subscriptions_count.month).drop(df_subscriptions_count.week)\
        .drop(df_reactions_count.month).drop(df_reactions_count.week)

view_2.show()

                                                                                

+-----+----+---------+----------+------------+-------------+-----------------+------------------+-------------+--------------+
|month|week|week_user|month_user|week_message|month_message|week_subscription|month_subscription|week_reaction|month_reaction|
+-----+----+---------+----------+------------+-------------+-----------------+------------------+-------------+--------------+
|    2|   9|    14024|    160950|       33905|        28057|           559262|            413952|       294376|        216501|
|    3|   9|    14024|      8100|       33905|        28290|           559262|            471341|       294376|        248424|
|    2|   8|   155026|    160950|       22442|        28057|           326031|            413952|       170549|        216501|
+-----+----+---------+----------+------------+-------------+-----------------+------------------+-------------+--------------+



                                                                                

## Построить витрину для рекомендации друзей

Как будет работать рекомендация друзей: если пользователи подписаны на один канал, ранее никогда не переписывались и расстояние между ними не превышает 1 км, то им обоим будет предложено добавить другого в друзья. Образовывается парный атрибут, который обязан быть уникальным: порядок упоминания не должен создавать дубли пар.
Витрина будет содержать следующие атрибуты:
* user_left — первый пользователь;
* user_right — второй пользователь;
* processed_dttm — дата расчёта витрины (не реализовал, не пойму как);
* zone_id — идентификатор зоны (города);
* local_time — локальное время (не уверен, что реализовал правильно).

Добавим в первую витрину информацию о подписках:

In [24]:
view_3 = view_1_1.join(df_subscriptions, view_1_1.user == df_subscriptions.user)\
                    .drop(df_subscriptions.user)

Создадим копию полученной таблицы и объединим их с условием, что город и группа подписки совпадает, но пользователь другой - таким образом у нас произойдет объединение по пользователям, которых можно рекомендовать друг другу.

In [26]:
view_3_copy = view_3.withColumnRenamed("user", "user_right")

cond_view_3 = [view_3.act_city == view_3_copy.act_city,
               view_3.channel == view_3_copy.channel,
               view_3.user != view_3_copy.user_right]

view_3_1 = view_3.join(view_3_copy, cond_view_3)\
        .drop(view_3_copy.act_city)\
        .drop(view_3_copy.TIME_UTC)\
        .drop(view_3_copy.timezone)\
        .drop(view_3_copy.home_city)\
        .drop(view_3_copy.travel_count)

view_3_2 = view_3_1.select('user', 'user_right', 'act_city', 'timezone')\
            .withColumnRenamed("user", "user_left")\
            .withColumnRenamed("act_city", "zone_id")\
            .withColumnRenamed("timezone", "local_time")

view_3_2.show()



+---------+----------+--------+-------------------+
|user_left|user_right| zone_id|         local_time|
+---------+----------+--------+-------------------+
|   104017|    129061|Adelaide| Australia/Adelaide|
|   129061|    104017|Adelaide| Australia/Adelaide|
|    85592|    120651|Adelaide| Australia/Adelaide|
|    85592|    147175|Adelaide| Australia/Adelaide|
|   120651|     85592|Adelaide| Australia/Adelaide|
|   120651|    147175|Adelaide| Australia/Adelaide|
|   147175|    120651|Adelaide| Australia/Adelaide|
|   147175|     85592|Adelaide| Australia/Adelaide|
|    63399|    141465|Adelaide| Australia/Adelaide|
|   141465|     63399|Adelaide| Australia/Adelaide|
|    36868|    104875|Adelaide| Australia/Adelaide|
|   104875|     36868|Adelaide| Australia/Adelaide|
|   138054|     24264|Adelaide| Australia/Adelaide|
|    24264|    138054|Adelaide| Australia/Adelaide|
|    79678|    117504|Adelaide| Australia/Adelaide|
|   117504|     79678|Adelaide| Australia/Adelaide|
|   154756| 

                                                                                

**Если дочитал до конца, спасибо! Сам знаю, что очень много недоработок, но с учетом, что инфраструктура работает с перебоями времени не хватило, да и желание этим заниматься в таких условиях пропало совсем.**