### RDD

In [None]:
# Create an Object
from pyspark.sql import SparkSession
spark_session = SparkSession \
                    .builder \
                    .enableHiveSupport() \
                    .appName("spark sql") \
                    .master("local") \
                    .getOrCreate()

In [None]:
# Load rdd from a text file
geoip_rdd = spark_session \
                .sparkContext \
                .textFile("/user/pmezentsev/geoip")

In [None]:
# Check the 1st 3 rows
geoip_rdd.take(3)

In [None]:
# preprocess
geoip_rdd1 = geoip_rdd.map(lambda x: x.split(", "))

### spark dataframe

In [None]:
# Define a schema
from pyspark.sql.types import *
schema = StructType() \
            .add("ip", StringType()) \
            .add("code", StringType()) \
            .add("country", StringType())

In [None]:
# Load RDD -> schema = spark dataframe
geoip_df = spark_session \
                .createDataFrame(geoip_rdd1, schema)
    
# show dataframe
geoip_df.show(3)

# check rdd of dataframe
geoip_df.rdd.take(2)

# print schema
geoip_df.printSchema()

In [None]:
# spark dataframe -> pandas dataframe(careful may run out of memory)
geoip_pd = geoip_df.toPandas()
geoip_pd.head(3)

# pandas dataframe -> spark dataframe
geoip_df = spark_session.createDataFrame(geoip_pd)

### Process df as sql

In [None]:
# sql
geoip_df \
    .select("country", "ip") \
    .where("country = 'Russian Federation'") \
    .show(3)

In [None]:
# create a SQL view (SQL keeps the data - SQL view recompute data upon request)
geoip_df.createTempView("geoip")

# sql from view
spark_session.sql("""
    select country from geoip
""").show(3)

### Working w/ Hive

In [None]:
# List in pandas
## show database
spark_session.sql("""
    show databases
""").toPandas()

## show tables(in web database)
spark_session.sql("""
    show tables in web
""").toPandas()

In [None]:
# List
## list all databases
spark_session.catalog.listDatabases()

## show tables(in web database)
spark_session.catalog.listTables("web")

In [None]:
# Save tables from spark sql -> hive
spark_session.sql("""
    create table
        web.geoip as
    select
        *
    from
        geoip
""")

In [None]:
# save a table -> hive
geoip_df.write.saveAsTable("web.geoip", mode='overwrite')

# save a table -> file
geoip_df.write.save("geoip",format='csv')

### RDD vs spark df vs SQL

In [None]:
DF > SQL > RDD

### Projection(columns) and filtering(rows)

#### projection

In [None]:
# SQL
spark_session.sql("""
    select
        ip, url
    from
        web.access_log
""").limit(3).toPandas()

In [None]:
# spark df
access_log_df = spark_session.read.table("web.access_log")

access_log_df \
    .select("ip", "url") \
    .limit(3) \
    .toPandas()

# or
access_log_df \
    .select(access_log_df.ip, access_log_df.url) \
    .limit(3) \
    .toPandas()

# or
access_log_df \
    .[["url", "ip"]] \
    .limit(3) \
    .toPandas()

# change column names
access_log_df \
    .select(
        access_log_df.ip,
        access_log_df.url.alias("url_part")) \
    .limit(3) \
    .toPandas()

# Use functions
import pyspark.sql.functions as f
access_log_df \
    .select(
        f.col("ip"),
        f.col("url").alias("url_part")) \
    .limit(3) \
    .toPandas()

#### filtering

In [None]:
# SQL
spark_session.sql("""
    select
        *
    from
        web.access_log
    where
        http_code <> '200'
""").limit(3).toPandas()

# spark df
access_log_df = spark_session.read.table("web.access_log")

access_log_df \
    .where("http_code <> '200'") \
    .limit(3) \
    .toPandas()

# or
access_log_df \
    .where(access_log_df.http_code <> '200') \
    .limit(3) \
    .toPandas()

In [None]:
# SQL
spark_session.sql("""
    select
        *
    from
        web.access_log
    where
        http_code <> '200' and
        user_agent like '%Android%'
""").limit(3).toPandas()

# DF
access_log_df \
    .where(
        (access_log_df.http_code <> '200') &
        (access_log_df.user_agent.like('%Android%'))
    ) \
    .limit(3) \
    .toPandas()

#### functions
map: n -> n
generating: n -> m (n<m)
aggregating: m -> n (m>n)

##### map

In [None]:
# SQL
spark_session.sql("""
    select
        user_agent,
        length(user_agent) as len
    from
        web.access_log
    limit
        3
""").toPandas()

# spark df
import pyspark.sql.functions as f 
access_log_df = spark_session.read.table("web.access_log")

access_log_df \
    .select(
        "user_agent",
        f.length("user_agent").alias("len")) \
    .limit(3) \
    .toPandas()

In [None]:
# concat
access_log_df \
    .select(
        "url",
        f.concat(f.lit("http://vk.com"), access_log_df.url)
    )\
    .limit(3) \
    .toPandas()

In [None]:
# split
access_log_df \
    .select("user_agent") \
    .select(
        "user_agent",
        f.split("user_agent", " ").alias("list")
    ) \
    .limit(3) \
    .toPandas()

In [None]:
# explode (seperate -> list)
access_log_df \
    .select(
        "user_agent",
        f.split("user_agent", " ").alias("list")
    ) \
    .select(
        "user_agent",
        f.explode("list")
    ) \
    .where(f.col("col") == "Android") \
    .limit(3) \
    .toPandas()

In [None]:
# when, otherwise
access_log_df \
    .select(
        "user_agent",
        f \
            .when(access_log_df.user_agent.like("%Android%"), "Android") \
            .otherwise("Other") \
            .alias("OS")
    ) \
    .limit(3) \
    .toPandas()

##### aggregating

In [None]:
# SQL
spark_session.sql("""
    select
        url,
        count(ip)
    from
        web.access_log
    group by
        url
    limit
        3
""").toPandas()

# spark df
import pyspark.sql.functions as f 
access_log_df = spark_session.read.table("web.access_log")

access_log_df \
    .groupBy("url") \
    .agg(f.count("ip")) \
    .limit(3) \
    .toPandas()

# or
access_log_df \
    .groupBy("url") \
    .agg({"ip": "count"}) \
    .limit(3) \
    .toPandas()

In [None]:
# SQL
spark_session.sql("""
    select
        url,
        http_code,
        count(distinct ip)
    from
        web.access_log
    group by
        url,
        http_code
    limit
        3
""").toPandas()

# spark df
access_log_df \
    .groupBy("url", "http_code") \
    .agg(f.count("ip")) \
    .limit(3) \
    .toPandas()

In [None]:
# spark df
access_log_df \
    .groupBy(f.length("url")) \
    .agg(f.count("ip")) \
    .limit(3) \
    .toPandas()

In [None]:
# agg whole table n --> 1
access_log_df \
    groupBy() \
    .agg(f.count("*")) \
    .limit(3) \
    .toPandas()

In [None]:
# Word count: find the most frequent word
access_log_df \
    .select(
        f.split("user_agent", " ").alias("words")) \
    .select(
        f.explode("words").alias("words")) \
    .groupBy(
        "word") \
    .agg(
        f.count("*").alias("count")) \
    .orderBy(
        f.col("count").desc()) \
    .limit(
        3) \
    .toPandas()

##### join

In [None]:
# SQL
spark_session.sql("""
    select
        *
    from
        web.access_log as l
    join
        web.geoip as g
    on
        l.ip = g.ip
""").limit(3).toPandas()

# spark df
import pyspark.sql.functions as f 
access_log_df = spark_session.read.table("web.access_log")

access_log_df \
    .join(
        geoip_df, on = (access_log.ip == geoip.ip)) \
    .limit(
        3) \
    .toPandas()

In [None]:
access_log_df \
    .join(
        geoip_df, on = "ip") \
    .groupby(
        "country") \
    .agg(
        f.countDistinct("ip").alias("cnt")) \
    .limit(
        3) \
    .toPandas()

In [None]:
# left join
access_log_df \
    .join(
        geoip_df, on = "ip", how = "left") \
    .limit(
        3) \
    .toPandas()

### User defined functions

In [None]:
# word count
import pyspark.sql.functions as f 

access_log_df = spark_session.read.table("web.access_log")

def parse_user_agent_udf(user_agent):
    user_agent = re.sub("/?[\d_.]+", "", user_agent) # remove numbers, points, slashes, uderlines
    user_agent = re.sub("[;\(\):,]", "", user_agent) # remove punctuation marks, opening brackets, closing brackets, periods, semi-colons.
    return user_agent.lower().split()


access_log_df \
    .select(
        parse_user_agent_udf("user_agent").alias("words")) \
    .select(
        f.explode("words").alias("word")) \
    .groupBy(
        "word") \
    .agg(
        f.count("*").alias("count")) \
    .orderBy(
        f.col("count").desc()) \
    .limit(
        3) \
    .toPandas()
    

# user defined functions SQL
# SQL
spark_session.sql("""
    select
        word,
        count(*) as cnt
    from (
        select
            explode(parse_user_agent_udf(user_agent)) as word
        from
            web.access_log
    ) as s
    group by 
        word
    order by
        cnt desc
""").limit(3).toPandas()

### Time Processing

In [None]:
# extract time
access_log_unix_time \
    .withColumn(
        "unixtime",
        f.unix_timestamp("time", "dd/MMM/yyyy:HH:mm:ss Z")) \
    .limit(5).toPandas()

In [None]:
# process
access_log_unix_time \
    .groupby(
        "ip") \
    .agg(
        f.min("unixtime").alias("begin"),
        f.max("unixtime").alias("end")
    ) \
    .select(
        "ip",
        (f.col("end") - f.col("begin"))
    ).alias("seconds_cnt") \
    .select(
        "ip",
        f.col("seconds_cnt")/60.0/60.0/24.0 + 1
    ) \
    .limit(
        5) \
    .toPandas()

In [None]:
# Unixtime -> timestamp
access_log_timestamp = access_log_unix_time \
    .withColumn(
        "timestamp",
        f.col("unixtime").astype("timestamp")
    ) \
    .limit(
        5) \
    .toPandas()

# Note: datediff only work w/ timestamp
access_log_timestamp \
    .groupby(
        "ip") \
    .agg(
        f.min("timestamp").alias("begin"),
        f.max("timestamp").alias("end")
    ) \
    .select(
        "ip",
        (f.datediff("end", "begin")).alias("days_cnt")
    ) \
    .limit(
        5) \
    .toPandas()

### Windows functions
    - special kind of aggregation functions

| |  aggregation | window function |
|:--- :|:---:| :---:|
| applied to | whole table | column  |
| number of rows | reduces | remains unchanged |
| grouping condition | goes first | goes last |
| values in a group | df.groupby(...).agg(...) <br> unordered | func("column").over(...) <br> ordered |

In [None]:
access_log_timestamp \
    .select(
        "ip",
        "time",
        f.count("*").over(Window.partitionBy("ip")).alias("cnt")
    ) \
    .limit(5).toPandas()

### 2-D distribution