# Outline

1. Working with time
2. Window functions

## Validate environment

In [1]:
import sys
print(sys.version_info)

sys.version_info(major=3, minor=6, micro=2, releaselevel='final', serial=0)


In [2]:
import pyspark
print(pyspark.__version__)

2.1.1+hadoop2.7


## Create SparkSession

In [3]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

import pyspark.sql.functions as F
from pyspark.sql import types

In [4]:
spark_config = SparkConf().setMaster("local[*]").setAppName("Spark SQL overview")
spark = SparkSession.Builder().config(conf=spark_config).getOrCreate()

In [5]:
spark._sc.getConf().getAll()

[('hive.metastore.warehouse.dir', 'file:/home/jovyan/spark-warehouse'),
 ('spark.driver.host', '172.17.0.2'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '39223'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.app.id', 'local-1597233928064'),
 ('spark.app.name', 'Spark SQL overview')]

In [6]:
spark._sc.parallelize(range(1000)).getNumPartitions()

4

# Load tabular datasets

Logs dataset: https://drive.google.com/drive/u/0/folders/1newx8_j2QU49dI8ogv-eWhvOylLpx7BO

In [7]:
! head -3 logsM.txt

33.49.147.163	20140101014611	http://news.rambler.ru/3105700	378	431	Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0; .NET CLR 3.5.30729;)
197.72.248.141	20140101020306	http://news.mail.ru/6344933	1412	203	Safari/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0; .NET CLR 3.5.30729; .NET CLR 3.0.30729;
33.49.147.163	20140101023103	http://lenta.ru/4303000	1189	451	Chrome/5.0 (compatible; MSIE 9.0; Windows NT 6.1; Win64; x64; Trident/5.0)


In [8]:
log_schema = types.StructType(fields=[
    types.StructField("ip", types.StringType()),
    types.StructField("timestamp", types.LongType()),
    types.StructField("url", types.StringType()),
    types.StructField("size", types.IntegerType()),
    types.StructField("code", types.IntegerType()),
    types.StructField("ua", types.StringType()),
])

In [9]:
log_df = spark.read.csv("logsM.txt", sep="\t", schema=log_schema).cache()

In [10]:
log_df = log_df.repartition(4)

In [11]:
! head -5 ipDataM.txt

49.105.15.79	Komi
110.91.102.196	Chelyabinsk Oblast
56.167.169.126	Saint Petersburg
75.208.40.166	Ulyanovsk Oblast
168.255.93.197	Irkutsk Oblast


In [12]:
ip_schema = types.StructType(fields=[
    types.StructField("ip", types.StringType()),
    types.StructField("region", types.StringType()),
])

In [13]:
ips_df = spark.read.csv("ipDataM.txt", sep="\t", schema=ip_schema).cache()

In [14]:
log_with_regions = log_df.join(ips_df, on="ip", how="inner")

In [15]:
log_with_regions = log_with_regions.repartition(4).cache()

In [16]:
log_with_domains = (
    log_with_regions
    .withColumn("domain", F.regexp_extract("url", "http:\/\/(.*)\/", 1))
)
log_with_domains.show(3)

+------------+--------------+--------------------+----+----+--------------------+--------------+------------+
|          ip|     timestamp|                 url|size|code|                  ua|        region|      domain|
+------------+--------------+--------------------+----+----+--------------------+--------------+------------+
|3.183.113.77|20140102183041|http://news.mail....| 864| 413|Firefox/5.0 (Wind...|Ivanovo Oblast|news.mail.ru|
|3.183.113.77|20140102183041|http://news.mail....| 864| 413|Firefox/5.0 (Wind...|      Dagestan|news.mail.ru|
|3.183.113.77|20140102183041|http://news.mail....| 864| 413|Firefox/5.0 (Wind...|  Kirov Oblast|news.mail.ru|
+------------+--------------+--------------------+----+----+--------------------+--------------+------------+
only showing top 3 rows



## Working with time

Let us count number of days users visited our site

In [17]:
log_with_domains[["timestamp"]].show(3)

+--------------+
|     timestamp|
+--------------+
|20140102183041|
|20140102183041|
|20140102183041|
+--------------+
only showing top 3 rows



In [18]:
log_with_domains.withColumn("ts", F.unix_timestamp("timestamp", "yyyyMMddHHmmss")).show(3)

AnalysisException: "cannot resolve 'unix_timestamp(`timestamp`, 'yyyyMMddHHmmss')' due to data type mismatch: argument 1 requires (string or date or timestamp) type, however, '`timestamp`' is of bigint type.;;\n'Project [ip#0, timestamp#1L, url#2, size#3, code#4, ua#5, region#51, domain#151, unix_timestamp(timestamp#1L, yyyyMMddHHmmss) AS ts#246]\n+- Project [ip#0, timestamp#1L, url#2, size#3, code#4, ua#5, region#51, regexp_extract(url#2, http:\\/\\/(.*)\\/, 1) AS domain#151]\n   +- Repartition 4, true\n      +- Project [ip#0, timestamp#1L, url#2, size#3, code#4, ua#5, region#51]\n         +- Join Inner, (ip#0 = ip#50)\n            :- Repartition 4, true\n            :  +- Relation[ip#0,timestamp#1L,url#2,size#3,code#4,ua#5] csv\n            +- Relation[ip#50,region#51] csv\n"

In [19]:
log_with_unixtimestamp = (
    log_with_domains
    .withColumn("ts", F.unix_timestamp(F.col("timestamp").cast("string"), "yyyyMMddHHmmss"))
    .drop("timestamp", "url", "size", "code", "ua", "region", "domain")
)

log_with_unixtimestamp.show(3)

+------------+----------+
|          ip|        ts|
+------------+----------+
|3.183.113.77|1388687441|
|3.183.113.77|1388687441|
|3.183.113.77|1388687441|
+------------+----------+
only showing top 3 rows



In [21]:
log_with_unixtimestamp.printSchema()

root
 |-- ip: string (nullable = true)
 |-- ts: long (nullable = true)



In [28]:
(
log_with_unixtimestamp
    .groupBy("ip")
    .agg(F.min("ts").alias("begin"), F.max("ts").alias("end"))
    .select("ip", (F.col("end") - F.col("begin")).alias("seconds_overall"))
    .select("ip", (F.col("seconds_overall") / 60.0 / 60.0 / 24.0).alias("days"))
    .orderBy("days", ascending = False)
    .show(5)
)

+---------------+------------------+
|             ip|              days|
+---------------+------------------+
|  33.49.147.163|115.72543981481482|
| 197.72.248.141|115.62893518518518|
|  75.208.40.166|115.60479166666666|
|247.182.249.253|115.47098379629631|
| 181.217.177.35|115.46608796296296|
+---------------+------------------+
only showing top 5 rows



In [29]:
(
log_with_unixtimestamp
    .groupBy("ip")
    .agg(F.min("ts").alias("begin"), F.max("ts").alias("end"))
    .select("ip", (F.col("end") - F.col("begin")).alias("seconds_overall"))
    .select("ip", (F.col("seconds_overall") / 60.0 / 60.0 / 24.0).alias("days"))
    .show(5)
)

+--------------+------------------+
|            ip|              days|
+--------------+------------------+
|  3.183.113.77|115.00466435185184|
|168.255.93.197|115.05512731481481|
|222.131.187.37|115.40369212962963|
|56.167.169.126|114.50332175925926|
| 33.49.147.163|115.72543981481482|
+--------------+------------------+
only showing top 5 rows



In [31]:
(
log_with_unixtimestamp
    .groupBy("ip")
    .agg(F.min("ts").alias("begin"), F.max("ts").alias("end"))
    .select("ip", (F.col("end") - F.col("begin")).alias("seconds_overall"))
    .select("ip", (F.col("seconds_overall") / 60.0 / 60.0 / 24.0).alias("days"))
    .select(F.avg("days"))
    .show(5)
)

+------------------+
|         avg(days)|
+------------------+
|114.51553513071893|
+------------------+



## Window functions

Let us count the amount of user's sessions

In [32]:
from pyspark.sql import Window

(
log_with_unixtimestamp
    .select("ip", "ts", F.count("*").over(Window.partitionBy("ip")).alias("count"))
    .orderBy("count")
    .show(10)
)

+------------+----------+-----+
|          ip|        ts|count|
+------------+----------+-----+
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
|25.62.10.220|1388835012| 7020|
+------------+----------+-----+
only showing top 10 rows



In [33]:
# user_window = Window.orderBy("ts").partitionBy("ip")
user_window = Window.partitionBy("ip").orderBy("ts")

In [34]:
(
    log_with_unixtimestamp.select(
        "ip",
        "ts",
        F.row_number().over(user_window).alias("row_number"),
        F.lag("ts").over(user_window).alias("lag"),
        F.lead("ts").over(user_window).alias("lead"),
    )
    .show(10)
)

+------------+----------+----------+----------+----------+
|          ip|        ts|row_number|       lag|      lead|
+------------+----------+----------+----------+----------+
|3.183.113.77|1388594462|         1|      null|1388594462|
|3.183.113.77|1388594462|         2|1388594462|1388594462|
|3.183.113.77|1388594462|         3|1388594462|1388594462|
|3.183.113.77|1388594462|         4|1388594462|1388594462|
|3.183.113.77|1388594462|         5|1388594462|1388594462|
|3.183.113.77|1388594462|         6|1388594462|1388594462|
|3.183.113.77|1388594462|         7|1388594462|1388594462|
|3.183.113.77|1388594462|         8|1388594462|1388594462|
|3.183.113.77|1388594462|         9|1388594462|1388594462|
|3.183.113.77|1388594462|        10|1388594462|1388594462|
+------------+----------+----------+----------+----------+
only showing top 10 rows



In [35]:
(
log_with_unixtimestamp
    .select("ip", "ts", F.lead("ts").over(user_window).alias("lead"))
    .select("ip", "ts", (F.col("lead") - F.col("ts")).alias("diff"))
    .where("diff > 60 * 30 or diff IS NULL")
    .groupBy("ip")
    .count()
    .orderBy(F.col("count").desc())
    .show(30)
)

+---------------+-----+
|             ip|count|
+---------------+-----+
|  75.208.40.166| 1363|
| 197.72.248.141| 1101|
|  33.49.147.163|  993|
| 222.131.187.37|  711|
|135.124.143.193|  653|
| 168.255.93.197|  589|
| 56.167.169.126|  579|
|   49.203.96.67|  526|
|   49.105.15.79|  480|
| 110.91.102.196|  362|
|247.182.249.253|  270|
| 231.119.88.198|  207|
|   25.62.10.220|  152|
| 181.217.177.35|  137|
| 168.146.187.80|  104|
|   3.183.113.77|   99|
|    14.8.59.211|   57|
+---------------+-----+



In [36]:
%%time
log_with_unixtimestamp.coalesce(1).write.csv("log_with_unixtimestamp.csv", sep="\t")

CPU times: user 3.28 ms, sys: 3.92 ms, total: 7.2 ms
Wall time: 18.1 s


In [37]:
%%time
log_with_unixtimestamp.write.csv("log_with_unixtimestamp_no_coal.csv", sep="\t")

CPU times: user 12.7 ms, sys: 156 µs, total: 12.8 ms
Wall time: 8.52 s


In [38]:
spark.stop()