In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = (
    SparkSession.builder
        .appName("wsl-ui-test")
        .master("local[*]")
        .config("spark.driver.bindAddress", "127.0.0.1")
        .config("spark.driver.host", "127.0.0.1")
        .config("spark.ui.port", "4046")
        .config("spark.driver.extraJavaOptions", "-Djava.net.preferIPv4Stack=true")
        .getOrCreate()
)

spark

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/12/05 13:05:16 WARN Utils: Your hostname, DESKTOP-VV6FGUQ, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/12/05 13:05:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/05 13:05:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.option('header', True).option('inferSchema', True).csv("/home/ilyasius/pyspark_projects/data_emp.csv")

In [2]:
from tabulate import *

In [5]:
def git(df, n=100, fmt="fancy_grid"):
    data = df.limit(n).collect()
    headers = df.columns
    print(tabulate(data, headers=headers, tablefmt=fmt, showindex=False))

In [6]:
git(df,10)
df.printSchema()

╒═══════════════╤══════════════╤═════════════╤═════════════════════════╤══════════════╤══════════╤════════════════╤═══════╕
│   employee_id │ first_name   │ last_name   │ email                   │ department   │   salary │ joining_date   │   age │
╞═══════════════╪══════════════╪═════════════╪═════════════════════════╪══════════════╪══════════╪════════════════╪═══════╡
│             1 │ Joshua       │ Ramos       │ alexandra29@gmail.com   │ Operations   │  65313.8 │ 23-05-2024     │    42 │
├───────────────┼──────────────┼─────────────┼─────────────────────────┼──────────────┼──────────┼────────────────┼───────┤
│             2 │ Christina    │ Clark       │ littlekyle@yahoo.com    │ IT           │  58827.3 │ 02-10-2021     │    46 │
├───────────────┼──────────────┼─────────────┼─────────────────────────┼──────────────┼──────────┼────────────────┼───────┤
│             3 │ Jonathon     │ Sullivan    │ tannerivan@johnson.com  │ Operations   │  57427.3 │ 30-04-2020     │    50 │
├───────

In [7]:
from pyspark.sql.types import *
from datetime import *

schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event", StringType(), True),
    StructField("value", IntegerType(), True),
    StructField("ts", TimestampType(), True)
])

data = [
    (1, "click", 10, datetime.strptime("2023-01-01 10:00", "%Y-%m-%d %H:%M")),
    (1, "click", 20, datetime.strptime("2023-01-01 10:05", "%Y-%m-%d %H:%M")),
    (1, "buy", 100, datetime.strptime("2023-01-01 10:10", "%Y-%m-%d %H:%M")),
    (2, "click", 5, datetime.strptime("2023-01-01 11:00", "%Y-%m-%d %H:%M")),
    (2, "buy", 50, datetime.strptime("2023-01-01 11:02", "%Y-%m-%d %H:%M"))
]

df_1 = spark.createDataFrame(data, schema)

In [8]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window


In [9]:
df_1.show()

                                                                                

+-------+-----+-----+-------------------+
|user_id|event|value|                 ts|
+-------+-----+-----+-------------------+
|      1|click|   10|2023-01-01 10:00:00|
|      1|click|   20|2023-01-01 10:05:00|
|      1|  buy|  100|2023-01-01 10:10:00|
|      2|click|    5|2023-01-01 11:00:00|
|      2|  buy|   50|2023-01-01 11:02:00|
+-------+-----+-----+-------------------+



In [10]:
df = df_1.groupBy('user_id')\
    .pivot('event')\
    .count()
git(df)

╒═══════════╤═══════╤═════════╕
│   user_id │   buy │   click │
╞═══════════╪═══════╪═════════╡
│         1 │     1 │       2 │
├───────────┼───────┼─────────┤
│         2 │     1 │       1 │
╘═══════════╧═══════╧═════════╛


In [11]:
schema1 = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("order_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("ts", StringType(), True) 
])

data1 = [
    (1, 101, 10, "2023-01-01 10:00"),
    (1, 102, 20, "2023-01-01 10:05"),
    (1, 103, 30, "2023-01-01 10:10"),
    (2, 201, 50, "2023-01-01 11:00"),
    (2, 202, 70, "2023-01-01 11:05")
]

per = spark.createDataFrame(data1, schema1)

df_2 = per.withColumn("ts", per["ts"].cast("timestamp"))

In [12]:
df_2.show()

+-------+--------+------+-------------------+
|user_id|order_id|amount|                 ts|
+-------+--------+------+-------------------+
|      1|     101|    10|2023-01-01 10:00:00|
|      1|     102|    20|2023-01-01 10:05:00|
|      1|     103|    30|2023-01-01 10:10:00|
|      2|     201|    50|2023-01-01 11:00:00|
|      2|     202|    70|2023-01-01 11:05:00|
+-------+--------+------+-------------------+



In [13]:
w=Window.partitionBy('user_id').orderBy('ts')
df = df_2.withColumn('total_pay', sum('amount').over(w))
git(df)

╒═══════════╤════════════╤══════════╤═════════════════════╤═════════════╕
│   user_id │   order_id │   amount │ ts                  │   total_pay │
╞═══════════╪════════════╪══════════╪═════════════════════╪═════════════╡
│         1 │        101 │       10 │ 2023-01-01 10:00:00 │          10 │
├───────────┼────────────┼──────────┼─────────────────────┼─────────────┤
│         1 │        102 │       20 │ 2023-01-01 10:05:00 │          30 │
├───────────┼────────────┼──────────┼─────────────────────┼─────────────┤
│         1 │        103 │       30 │ 2023-01-01 10:10:00 │          60 │
├───────────┼────────────┼──────────┼─────────────────────┼─────────────┤
│         2 │        201 │       50 │ 2023-01-01 11:00:00 │          50 │
├───────────┼────────────┼──────────┼─────────────────────┼─────────────┤
│         2 │        202 │       70 │ 2023-01-01 11:05:00 │         120 │
╘═══════════╧════════════╧══════════╧═════════════════════╧═════════════╛


In [14]:
schema2 = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event", StringType(), True),
    StructField("ts", StringType(), True)
])

data2 = [
    (1, "login", "2023-01-01 10:00"),
    (1, "click", "2023-01-01 10:01"),
    (1, "logout", "2023-01-01 10:05"),
    (2, "login", "2023-01-01 11:00"),
    (2, "logout", "2023-01-01 11:02")
]

per1 = spark.createDataFrame(data2, schema2)

df_3 = per1.withColumn("ts", per1["ts"].cast("timestamp"))

df_3.show()

+-------+------+-------------------+
|user_id| event|                 ts|
+-------+------+-------------------+
|      1| login|2023-01-01 10:00:00|
|      1| click|2023-01-01 10:01:00|
|      1|logout|2023-01-01 10:05:00|
|      2| login|2023-01-01 11:00:00|
|      2|logout|2023-01-01 11:02:00|
+-------+------+-------------------+



In [15]:
df_filter = df_3.filter(col('event').isin('login','logout'))

w=Window.partitionBy('user_id').orderBy('ts')

df_result = df_filter.withColumn('lag_ts', lag('ts').over(w))\
        .withColumn('time_session', col('ts').cast('long') - col('lag_ts').cast('long'))\
        .select('user_id', 'time_session')\
        .dropna()
git(df_result)

╒═══════════╤════════════════╕
│   user_id │   time_session │
╞═══════════╪════════════════╡
│         1 │            300 │
├───────────┼────────────────┤
│         2 │            120 │
╘═══════════╧════════════════╛


In [16]:
schema3 = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("country", StringType(), True)
])
data3 = [(1, "US"), (2, "UK"), (3, "US")]
df_country = spark.createDataFrame(data3, schema3)


schema4 = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("ts", StringType(), True)
])
data4 = [
    (1, 100, "2026-01-01"),
    (2, 50,  "2026-01-02"),
    (1, 150, "2026-01-03"),
    (3, 200, "2023-01-04")
]
df_transactions = spark.createDataFrame(data4, schema4)
df_transactions = df_transactions.withColumn("ts", col('ts').cast('date'))


df_country.show()
df_transactions.show()


+-------+-------+
|user_id|country|
+-------+-------+
|      1|     US|
|      2|     UK|
|      3|     US|
+-------+-------+

+-------+------+----------+
|user_id|amount|        ts|
+-------+------+----------+
|      1|   100|2026-01-01|
|      2|    50|2026-01-02|
|      1|   150|2026-01-03|
|      3|   200|2023-01-04|
+-------+------+----------+



In [17]:
df_fl = df_transactions.filter(col('ts') > date_sub(current_date(), 30))
df_result = df_fl.join(df_country, on='user_id',how='inner')\
    .groupBy('country')\
    .agg(avg('amount').alias('avg_amount_country_30_day'))

git(df_result)

╒═══════════╤═════════════════════════════╕
│ country   │   avg_amount_country_30_day │
╞═══════════╪═════════════════════════════╡
│ US        │                         125 │
├───────────┼─────────────────────────────┤
│ UK        │                          50 │
╘═══════════╧═════════════════════════════╛


In [96]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event", StringType(), True),
    StructField("value", IntegerType(), True),
    StructField("ts", StringType(), True)  
])

# Исходные данные
data = [
    (1, "click", 10, "2023-01-01 10:00"),
    (1, "buy", 100, "2023-01-01 10:10"),
    (1, "refund", 50, "2023-01-01 10:15"),
    (2, "click", 5, "2023-01-01 11:00"),
    (2, "buy", 200, "2023-01-01 11:05"),
    (2, "refund", 20, "2023-01-01 11:10")
]

df = spark.createDataFrame(data, schema)

df = df.withColumn(
    "ts",
    to_timestamp("ts", "yyyy-MM-dd HH:mm")
)

df.show()


+-------+------+-----+-------------------+
|user_id| event|value|                 ts|
+-------+------+-----+-------------------+
|      1| click|   10|2023-01-01 10:00:00|
|      1|   buy|  100|2023-01-01 10:10:00|
|      1|refund|   50|2023-01-01 10:15:00|
|      2| click|    5|2023-01-01 11:00:00|
|      2|   buy|  200|2023-01-01 11:05:00|
|      2|refund|   20|2023-01-01 11:10:00|
+-------+------+-----+-------------------+



In [101]:
df.groupBy('user_id').pivot('event').agg(count("user_id")).show()

+-------+---+-----+------+
|user_id|buy|click|refund|
+-------+---+-----+------+
|      1|  1|    1|     1|
|      2|  1|    1|     1|
+-------+---+-----+------+



In [20]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("amount", IntegerType(), True),
    StructField("ts", StringType(), True) 
])

data = [
    (1, 10, "2023-01-01 10:00"),
    (1, 15, "2023-01-01 10:05"),
    (1, 20, "2023-01-01 10:10"),
    (2, 5,  "2023-01-02 09:00"),
    (2, 7,  "2023-01-02 09:10")
]

df = spark.createDataFrame(data, schema)

df = df.withColumn(
    "ts",
    to_timestamp("ts", "yyyy-MM-dd HH:mm")
)

df.show()

+-------+------+-------------------+
|user_id|amount|                 ts|
+-------+------+-------------------+
|      1|    10|2023-01-01 10:00:00|
|      1|    15|2023-01-01 10:05:00|
|      1|    20|2023-01-01 10:10:00|
|      2|     5|2023-01-02 09:00:00|
|      2|     7|2023-01-02 09:10:00|
+-------+------+-------------------+



In [21]:
w=Window.partitionBy('user_id').orderBy('ts')
w1=Window.partitionBy('user_id').orderBy(desc('ts'))

df_create = df.withColumn('rn', row_number().over(w))\
        .withColumn('rn2', row_number().over(w1))\

df_rn = df_create.filter(col('rn') == 1).select('user_id', col('amount').alias('first_amount'))

df_rn2 = df_create.filter(col('rn2') == 1).select('user_id', col('amount').alias('last_amount'))

df_sum = df.groupBy('user_id')\
        .agg(sum('amount').alias('sum_amount'), 
        count("*").alias('purchase_count'))
        
df_result = df_rn.join(df_rn2, on='user_id', how='inner')\
        .join(df_sum, on='user_id', how='inner')

df_result.show()

[Stage 69:>                                                       (0 + 12) / 12]

+-------+------------+-----------+----------+--------------+
|user_id|first_amount|last_amount|sum_amount|purchase_count|
+-------+------------+-----------+----------+--------------+
|      1|          10|         20|        45|             3|
|      2|           5|          7|        12|             2|
+-------+------------+-----------+----------+--------------+



                                                                                

In [22]:
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("items", ArrayType(StructType([
        StructField("item_id", IntegerType(), True),
        StructField("price", IntegerType(), True)
    ])), True)
])

data = [
    {
        "order_id": 1,
        "user_id": 10,
        "items": [
            {"item_id": 100, "price": 50},
            {"item_id": 101, "price": 70}
        ]
    },
    {
        "order_id": 2,
        "user_id": 20,
        "items": [
            {"item_id": 100, "price": 50}
        ]
    },
    {
        "order_id": 3,
        "user_id": 10,
        "items": [
            {"item_id": 103, "price": 120},
            {"item_id": 104, "price": 90}
        ]
    }
]

df = spark.createDataFrame(data, schema)

df.show(truncate=False)


+--------+-------+-----------------------+
|order_id|user_id|items                  |
+--------+-------+-----------------------+
|1       |10     |[{100, 50}, {101, 70}] |
|2       |20     |[{100, 50}]            |
|3       |10     |[{103, 120}, {104, 90}]|
+--------+-------+-----------------------+



In [23]:
df.select('order_id','user_id', explode(col('items')).alias('items'))\
    .withColumn('item_id', col('items.item_id'))\
    .withColumn('price', col('items.price'))\
    .withColumn('cur', current_timestamp())\
    .drop('items')\
    .show(truncate=False)
    

+--------+-------+-------+-----+--------------------------+
|order_id|user_id|item_id|price|cur                       |
+--------+-------+-------+-----+--------------------------+
|1       |10     |100    |50   |2025-12-05 13:06:00.670271|
|1       |10     |101    |70   |2025-12-05 13:06:00.670271|
|2       |20     |100    |50   |2025-12-05 13:06:00.670271|
|3       |10     |103    |120  |2025-12-05 13:06:00.670271|
|3       |10     |104    |90   |2025-12-05 13:06:00.670271|
+--------+-------+-------+-----+--------------------------+



In [90]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("event", StringType(), True),
    StructField("ts", StringType(), True)  
])

data = [
    (1, "click", "2023-01-01 10:00"),
    (1, "click", "2023-01-01 10:05"),
    (1, "click", "2023-01-01 10:40"),
    (1, "click", "2023-01-01 10:55"),
    (2, "click", "2023-01-01 11:00"),
    (2, "click", "2023-01-01 11:20"),
    (2, "click", "2023-01-01 12:00")
]

df = spark.createDataFrame(data, schema)

df = df.withColumn("ts", to_timestamp("ts", "yyyy-MM-dd HH:mm"))


In [91]:
df.show()

+-------+-----+-------------------+
|user_id|event|                 ts|
+-------+-----+-------------------+
|      1|click|2023-01-01 10:00:00|
|      1|click|2023-01-01 10:05:00|
|      1|click|2023-01-01 10:40:00|
|      1|click|2023-01-01 10:55:00|
|      2|click|2023-01-01 11:00:00|
|      2|click|2023-01-01 11:20:00|
|      2|click|2023-01-01 12:00:00|
+-------+-----+-------------------+



In [26]:
w=Window.partitionBy('user_id').orderBy('ts')

df1 = df.withColumn('lag_ts', lag('ts').over(w))\
        .withColumn('enumerate', 
            when(
            col('lag_ts').isNull() | (col('ts').cast('long') - col('lag_ts').cast('long') > 1800),1
            ).otherwise(0)
        )\
        .withColumn('enumerate', sum('enumerate').over(w))\
        .drop('lag_ts')

w1=Window.partitionBy('user_id','enumerate')

df2 = df1.withColumn('start_session', min('ts').over(w1))\
        .withColumn('end_session', max('ts').over(w1))\
        .withColumn('time_session', (col('end_session').cast('long') - col('start_session').cast('long')) / 60)\
        .drop('event', 'ts')\
        .distinct()

df2.show()

+-------+---------+-------------------+-------------------+------------+
|user_id|enumerate|      start_session|        end_session|time_session|
+-------+---------+-------------------+-------------------+------------+
|      1|        1|2023-01-01 10:00:00|2023-01-01 10:05:00|         5.0|
|      1|        2|2023-01-01 10:40:00|2023-01-01 10:55:00|        15.0|
|      2|        1|2023-01-01 11:00:00|2023-01-01 11:20:00|        20.0|
|      2|        2|2023-01-01 12:00:00|2023-01-01 12:00:00|         0.0|
+-------+---------+-------------------+-------------------+------------+



In [104]:
schema = StructType([
    StructField("user_id", IntegerType(), True),
    StructField("date", DateType(), True)
])

data = [
    (1, "2024-01-01"),
    (1, "2024-01-02"),
    (1, "2024-01-03"),
    (1, "2024-01-10"),
    (2, "2024-02-01"),
    (2, "2024-02-03"),
    (2, "2024-02-04")
]

df = spark.createDataFrame(data, ["user_id", "date"])
df = df.withColumn("date", to_date("date"))
git(df)

╒═══════════╤════════════╕
│   user_id │ date       │
╞═══════════╪════════════╡
│         1 │ 2024-01-01 │
├───────────┼────────────┤
│         1 │ 2024-01-02 │
├───────────┼────────────┤
│         1 │ 2024-01-03 │
├───────────┼────────────┤
│         1 │ 2024-01-10 │
├───────────┼────────────┤
│         2 │ 2024-02-01 │
├───────────┼────────────┤
│         2 │ 2024-02-03 │
├───────────┼────────────┤
│         2 │ 2024-02-04 │
╘═══════════╧════════════╛


In [105]:
w=Window.partitionBy('user_id').orderBy('date')

df1 = df.withColumn('lag_date', lag('date').over(w))\
    .withColumn('enumerate',
                when(
                    col('lag_date').isNull() | (datediff(col('date'), col('lag_date')) > 1), 1
                    ).otherwise(0)
                )\
    .withColumn('enumerate', sum('enumerate').over(w))\
    .drop('lag_date','date')
    
w1=Window.partitionBy('user_id','enumerate')

df_result = df1.withColumn('time_active', count("*").over(w1)).distinct()

df_result.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[user_id#911L, enumerate#916L, time_active#918L], functions=[])
   +- HashAggregate(keys=[user_id#911L, enumerate#916L, time_active#918L], functions=[])
      +- Window [count(1) windowspecdefinition(user_id#911L, enumerate#916L, specifiedwindowframe(RowFrame, unboundedpreceding$(), unboundedfollowing$())) AS time_active#918L], [user_id#911L, enumerate#916L]
         +- Sort [user_id#911L ASC NULLS FIRST, enumerate#916L ASC NULLS FIRST], false, 0
            +- Project [user_id#911L, enumerate#916L]
               +- Window [sum(enumerate#915) windowspecdefinition(user_id#911L, date#913 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS enumerate#916L], [user_id#911L], [date#913 ASC NULLS FIRST]
                  +- Project [user_id#911L, date#913, CASE WHEN (isnull(lag_date#914) OR (datediff(date#913, lag_date#914) > 1)) THEN 1 ELSE 0 END AS enumerate#915]
           

In [93]:
df = spark.createDataFrame([
    (1, 10),
    (2, None),
    (3, 5),
    (4, None)
    ], schema=['id', 'score'])

df.show()

+---+-----+
| id|score|
+---+-----+
|  1|   10|
|  2| NULL|
|  3|    5|
|  4| NULL|
+---+-----+



In [94]:
avgg = df.select(avg('score')).take(1)[0][0]

df = df.fillna(avgg)

git(df)

╒══════╤═════════╕
│   id │   score │
╞══════╪═════════╡
│    1 │      10 │
├──────┼─────────┤
│    2 │       7 │
├──────┼─────────┤
│    3 │       5 │
├──────┼─────────┤
│    4 │       7 │
╘══════╧═════════╛


In [106]:
df.explain(True)

== Parsed Logical Plan ==
'Project [unresolvedstarwithcolumns(date, 'to_date('date), None)]
+- LogicalRDD [user_id#911L, date#912], false

== Analyzed Logical Plan ==
user_id: bigint, date: date
Project [user_id#911L, to_date(date#912, None, Some(Asia/Yekaterinburg), true) AS date#913]
+- LogicalRDD [user_id#911L, date#912], false

== Optimized Logical Plan ==
Project [user_id#911L, cast(date#912 as date) AS date#913]
+- LogicalRDD [user_id#911L, date#912], false

== Physical Plan ==
*(1) Project [user_id#911L, cast(date#912 as date) AS date#913]
+- *(1) Scan ExistingRDD[user_id#911L,date#912]

