In [None]:
import os
import sys
os.environ["PYSPARK_SUBMIT_ARGS"]='--conf spark.sql.catalogImplementation=in-memory pyspark-shell'
os.environ["PYSPARK_PYTHON"]='/opt/anaconda/envs/bd9/bin/python'
os.environ["SPARK_HOME"]='/usr/hdp/current/spark2-client'

spark_home = os.environ.get('SPARK_HOME', None)
if not spark_home:
    raise ValueError('SPARK_HOME environment variable is not set')
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.10.7-src.zip'))
exec(open(os.path.join(spark_home, 'python/pyspark/shell.py')).read())

In [None]:
%pylab inline

## How to create a DataFrame?

### Read from an input source

In [None]:
spark.read

### The loading pipeline
```python
spark.read\
     .format(...)\
     .option(key, value)\
     .option(key, value)\
     .load(path)
```

In [None]:
df = spark.read\
          .format("csv")\
          .option("sep", "|")\
          .load("/user/pavel.klemenkov/lectures/lecture02/data/ml-100k/u.user")

In [None]:
df

In [None]:
df.show(1)

In [None]:
df.take(5)

### Schema!

In [None]:
from pyspark.sql.types import *

In [None]:
schema = StructType(fields=[
    StructField("user_id", IntegerType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType()),
    StructField("occupation", StringType()),
    StructField("zip", IntegerType())
])

In [None]:
df = spark.read\
          .schema(schema)\
          .format("csv")\
          .option("sep", "|")\
          .load("/user/pavel.klemenkov/lectures/lecture02/data/ml-100k/u.user")

In [None]:
df

In [None]:
df.printSchema()

In [None]:
df.show(5)

In [None]:
df.summary().show()

### So, the actual loading pipeline is
```python
spark.read\
     .schema(schema)\
     .format(...)\
     .option(key, value)\
     .option(key, value)\
     .load(path)
```

### There are also some convenient wrappers

In [None]:
df = spark.read.csv("/user/pavel.klemenkov/lectures/lecture02/data/ml-100k/u.user", schema=schema, sep="|")

In [None]:
df

In [None]:
df.show(5)

### Tons of data sources with a unified API!
+ CSV
+ JSON
+ Hive
+ HBase
+ Cassandra
+ MySQL
+ PostgreSQL
+ Parquet
+ ORC
+ Kafka
+ ElasticSearch
+ Amazon S3
+ ...and more through custom connectors

### Create a DataFrame from an RDD, pandas.DataFrame or a list

In [None]:
rdd = sc.textFile("/user/pavel.klemenkov/lectures/lecture02/data/ml-100k/u.user").map(lambda x: x.split("|"))

In [None]:
rdd.take(5)

In [None]:
df = spark.createDataFrame(rdd)

In [None]:
df

In [None]:
df = spark.createDataFrame(rdd, schema=schema)

In [None]:
df

In [None]:
df.show(5)

### Type mismatch, maybe we should ignore schema verification??

In [None]:
df = spark.createDataFrame(rdd, schema=schema, verifySchema=False)

In [None]:
df.show(5)

### No chance, have to convert data to proper types

In [None]:
rdd = rdd.map(lambda x: (int(x[0]), int(x[1]), x[2], x[3], int(x[4])))

In [None]:
df = spark.createDataFrame(rdd, schema=schema)

In [None]:
df.show(5)

## We will work with an artificial access log

In [None]:
!hdfs dfs -tail /user/pavel.klemenkov/lectures/lecture02/data/logsM.txt

In [None]:
log_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("timestamp", LongType()),
    StructField("url", StringType()),
    StructField("size", IntegerType()),
    StructField("code", IntegerType()),
    StructField("ua", StringType())
])

In [None]:
log = spark.read.csv("/user/pavel.klemenkov/lectures/lecture02/data/logsM.txt", sep="\t", schema=log_schema).cache()

In [None]:
log

In [None]:
log.rdd.getNumPartitions()

In [None]:
log = log.repartition(4)

In [None]:
log.show(5, vertical=True, truncate=False)

## Projections and filters
Projection is a subset of columns

Filter is a asubset of rows

In [None]:
log.schema.fieldNames()

In [None]:
log.select(["ip", "timestamp", "url"])

In [None]:
log.select(*log.schema.fieldNames()[:3]).show(5)

In [None]:
log.select("ip", "code").show(5)

In [None]:
 log.select(log.ip, log.code).show(5)

In [None]:
log.ip

## Aliasing

In [None]:
log.select(log.ip,
           log.code.alias("response")).show(5)

In [None]:
import pyspark.sql.functions as f

In [None]:
log.select("ip", 
           f.col("code").alias("response")).show(5)

## Good ol' Pandas

In [None]:
log[["ip", "code"]].show(5)

In [None]:
log[[log.ip, log.code.alias("response")]].show(5)

## Filtering

In [None]:
log.where("code == 200").show(5)

In [None]:
log.filter(log.code == 200).show(5, truncate=False)

In [None]:
log.filter("code == 200 AND url LIKE '%rambler%'").show(5, truncate=False, vertical=True)

In [None]:
log.filter((log.code.isin([200, 404])) & (log.url.like("%rambler%"))).show(5)

## Good ol' Pandas

In [None]:
log[(log.code == 200) & (log.url.like("%rambler%"))].show(5)

## Alltogether

In [None]:
log[(log.code == 200) & (log.url.like("%rambler%"))][["ip", "code"]].show(5)

## What about SQL?! It is there :)

In [None]:
query = """
SELECT ip, code FROM log_table 
WHERE code == 200 AND url LIKE '%rambler%'
"""

In [None]:
spark.sql(query).show(5)

### You have to register your DataFrame as a table

In [None]:
log.registerTempTable("log_table")

In [None]:
spark.sql(query).show(5)

## Functions
Three types:
+ mapping (one to one)
+ generating (one to many)
+ aggregating (many to one)

In [None]:
log.select("ua", f.length("ua")).show(5)

In [None]:
log.select("ua", f.length("ua").alias("length")).show(5)

In [None]:
log.select(f.concat("url", "?utm_medium=email")).show(5)

**`concat` needs `Column` type as argument. `lit` creates new `Column` from a literal value**

In [None]:
log.select(f.concat("url", f.lit("?utm_medium=email")).alias("newurl")).show(5, False)

## Explosions!

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list")).show(5, False, True)

**You can even select individual elements of the list!**

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.col("word_list")[0], f.col("word_list")[1])\
   .show(5)

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.explode("word_list").alias("word")).groupby("word").count().orderBy("count", ascending=False).show(5)

In [None]:
log1 = log.select("ua", f.split("ua", " ").alias("word_list"))

In [None]:
log1.show(5, vertical=True, truncate=False)

In [None]:
log2 = log1.select(f.explode("word_list").alias("words"))

In [None]:
log2.groupBy("words").count().orderBy("count", ascending=False).show(5)

## Joins

In [None]:
!hdfs dfs -tail /user/pavel.klemenkov/lectures/lecture02/data/ipDataM.txt

In [None]:
ip_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("region", StringType())
])

In [None]:
ips = spark.read.csv("/user/pavel.klemenkov/lectures/lecture02/data/ipDataM.txt", schema=ip_schema, sep="\t").cache()

In [None]:
ips.show(5)

## This is the trick to disable automatic broadcasts for joins. More on this at the end

In [None]:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 100500")

In [None]:
log_with_regions = log.join(ips, on="ip", how="inner")

In [None]:
log_with_regions

In [None]:
log_with_regions.show(5)

In [None]:
log_with_regions.rdd.getNumPartitions()

In [None]:
log_with_regions = log_with_regions.coalesce(4).cache()

## Query planner uses SortMergeJoin by default

In [None]:
log_with_regions.explain()

## Broadcast hint

In [None]:
log_with_regions = log.join(f.broadcast(ips), on="ip", how="inner")

In [None]:
log_with_regions.explain()

## Or use the new method `hint()`

In [None]:
log_with_regions = log.join(ips.hint("broadcast"), on="ip", how="inner").cache()

In [None]:
log_with_regions.explain(True)

## Aggregations

## The aggregation pipeline is as follows:
```python
df.groupBy(*cols)\
  .agg(*expressions)
```

In [None]:
log_with_regions.groupBy("region")\
                .agg(f.count("ip").alias("count"))\
                .orderBy("count", ascending=False)\
                .show(10)

In [None]:
log_with_regions.groupBy("region")\
                .count()\
                .withColumnRenamed("count", "row_count")\
                .show(10)

In [None]:
log_with_regions.groupBy("region")\
                .count()\
                .withColumnRenamed("count", "row_count")\
                .orderBy("row_count", ascending=False)\
                .show(10)

In [None]:
length_stat = log_with_regions.groupBy(f.length("url").alias("url_length"))\
                              .agg(f.count("*").alias("row_count"))\
                              .orderBy("row_count", ascending=False)\
                              .toPandas()

In [None]:
length_stat

In [None]:
length_stat.plot(kind="bar", x="url_length", y="row_count")

In [None]:
log_with_domains = log_with_regions.withColumn("domain", f.regexp_extract("url", "http:\/\/(.*)\/", 1))

In [None]:
log_with_domains.show(5, False, True)

In [None]:
length_stat = log_with_domains.groupBy(f.length("url").alias("url_length"), "domain")\
                              .agg(f.count("*").alias("row_count"))\
                              .orderBy("row_count", ascending=False)\
                              .toPandas()

In [None]:
length_stat

In [None]:
log_with_domains[log_with_domains.domain == "lenta.ru"][["url"]].show(5, False)

In [None]:
log_with_domains.withColumn("url_length", f.length("url")).corr("domain", "url_length")

## User Defined Functions
The function type of the UDF can be one of the following:
+ **SCALAR**. A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. calar UDFs are used with `pyspark.sql.DataFrame.withColumn()` and `pyspark.sql.DataFrame.select()`
+ **GROUPED_MAP**. A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`. Grouped map UDFs are used with `pyspark.sql.GroupedData.apply()`

In [None]:
log_with_domains[["domain"]].distinct().collect()

In [None]:
@f.pandas_udf(IntegerType())
def encode_domain(domains):
    mapping = {
        'lenta.ru': 0,
        'newsru.com': 1,
        'news.mail.ru': 2,
        'news.yandex.ru': 3,
        'news.rambler.ru': 4
    }
    return domains.apply(lambda x: mapping.get(x))

In [None]:
log_with_domains.withColumn("domain_digit", encode_domain("domain")).take(20)

In [None]:
log_with_domains.withColumn("domain_encoded", encode_domain("domain"))\
                .withColumn("url_length", f.length("url"))\
                .corr("url_length", "domain_encoded")

## Working with time. Count number of days users visited our site

In [None]:
log_with_domains[["timestamp"]].show(5)

In [None]:
log_with_domains.withColumn("ts", f.unix_timestamp("timestamp", "yyyyMMddHHmmss")).show(5)

In [None]:
log_with_unixts = log_with_domains\
                      .withColumn("ts", f.unix_timestamp(f.col("timestamp").cast("string"), "yyyyMMddHHmmss"))\
                      .drop("timestamp", "url", "size", "code", "ua", "region", "domain")

In [None]:
log_with_unixts.show(5)

In [None]:
log_with_unixts.groupBy("ip")\
               .agg(f.min("ts").alias("begin"),
                    f.max("ts").alias("end"))\
               .select("ip", (f.col("end") - f.col("begin")).alias("sec_count"))\
               .select("ip", (f.col("sec_count") / 60.0 / 60.0 / 24.0).alias("days"))\
               .show(5)

In [None]:
log_with_unixts.groupBy("ip")\
               .agg(f.min("ts").astype("timestamp").alias("begin"),
                    f.max("ts").astype("timestamp").alias("end"))\
               .select("ip", (f.datediff("end", "begin").alias("days")))\
               .show(5)

## Count user sessions

In [None]:
from pyspark.sql import Window

In [None]:
log_with_unixts.select("ip", "ts", f.count("*").over(Window.partitionBy("ip")).alias("cnt"))\
               .orderBy("cnt").show(10)

## Window functions allow to order values inside groups, so you have cool new functions like:
+ `first()`
+ `last()`
+ `lag()`
+ `lead()`

In [None]:
user_window = Window.orderBy("ts").partitionBy("ip")

In [None]:
log_with_unixts.select("ip", "ts",
                       f.row_number().over(user_window).alias("count"),
                       f.lag("ts").over(user_window).alias("lag"),
                       f.lead("ts").over(user_window).alias("lead"))\
               .show(10)

In [None]:
log_with_unixts.select("ip", "ts",
                       f.lead("ts").over(user_window).alias("lead"))\
               .select("ip", "ts",
                       (f.col("lead") - f.col("ts")).alias("diff"))\
               .where("diff >= 1800 or diff is NULL")\
               .groupBy("ip").count()\
               .orderBy(f.col("count").desc())\
               .show(30)

In [None]:
log_with_unixts.coalesce(1).write.csv("log_unixts.tsv", sep="\t")

In [None]:
spark.stop()