# Spark SQL cheatsheet

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as f

spark =(SparkSession
        .builder
        .enableHiveSupport()
        .appName("Spark sql")
        .master("local")
        .getOrCreate())

### Dataframe creation

#### From RDD + Schema¶

In [None]:
my_rdd = spark.sparkContext.textFile("my_text_file.csv").map(lambda x: x.split(", "))

shema = (StructType()
         .add("ip", StringType())
         .add("code", StringType())
         .add("country", StringType())
         )

df = spark.createDataFrame(my_rdd, schema)

#### From a Pandas dataframe

In [None]:
df = spark.createDataFrame(pandas_df)

#### From Hive

In [None]:
spark.sql("select country,ip, code from web.access_logs")
spark.read.table("web.access_logs")

#### From a file

In [None]:
spark.read.json("json_file")
spark.read.option("inferSchema", "true").option("header", "true").csv("csv_file") # Schema should be provided here
spark.read.parquet("parquet_file")

### Exploring a dataframe

In [None]:
df.printSchema()

In [None]:
type(df)

In [None]:
df.show(3)

In [None]:
df.rdd.take(3)

In [None]:
df.limit(3).toPandas()

In [None]:
df.limit(3).collect()

In [None]:
df.explain(True)

### Projection and filtering

In [None]:
(df
 .select("country", "id_", "counts", "http_code", "user_agent")
 .withColumnRenamed("id_", "id")
 .where("country = Colombia")
 .where(
     (df.http_code <> '200') &
     (df.user_agent.like('%Android%')))
 .where(f.col("id") < f.col("counts") )
 .show()
)

We can use a Pandas-like interface:

In [None]:
df[['country','ip']]

In [None]:
df[df.http_code <> '200']

### Functions

In [None]:
df.select("user_agent", f.length("user_agent").alias("len"))

In [None]:
df.select("url", f.concat(f.lit("http://vk.com"), "url"))

In [None]:
(df
 .select("user_agent", f.split("user_agent", " ").alias("list"))
 .select("user_agent", f.explode("list"))
)

In [None]:
(df
 .select("user_agent",
        f.when(df.user_agent.like("%Android%"), "Android")
         .when(df.user_agent.like("%Windows%"), "Windows")
         .otherwise("Other")
         .alias("OS"))
)

### Aggregation functions

In [None]:
(df
 .groupBy("url")
 .agg(f.count("ip")) # or .agg({"ip":"count"})
)

In [None]:
(df
 .groupBy("url")
 .agg(f.count("*").alias("count"),
      f.countDistict("ip"),
      f.min("response_length"),
      f.max("response_length"),
      f.avg("response_length").astype("int")) 
 .orderBy(f.col("count").desc())
)

### Joins

In [None]:
df1.join(df2, on = df1.ip == df2.ip, how='left')
# how options: inner (default), left, right, left_semi, left_anti etc.

In [None]:
df.crossJoin(df).count()

### User Defined Functions

In [None]:
def my_function():
    """Decompose url"""
    pass

my_function_udf = f.udf(my_function, ArrayType(StringType()))

df.select(my_function_udf("url").alias("url_decomposed"))

UDF from a python object

In [None]:
import user_agents as ua

get_browser_udf = f.udf(lambda x: ua.parse(x).browser.family)
get_os_udf      = f.udf(lambda x: ua.parse(x).os.family)
get_device_ud   = f.udf(lambda x: ua.parse(x).device.family)

Register a UDF

In [None]:
spark.register("my_function",
               my_function,
               ArrayType(StringType()))

spark.sql("select my_function(url) as url_decomposed fron web.access_log")

### Time processing

String to Unixtime: Time in seconds since 1970

In [None]:
df.withColumn("unixtime", f.unix_timestamp("time",
                                          "dd/MM/yyyy:HH:mm:hh Z"))


Unix to timestamp

In [None]:
df.withColumn("timestamp", f.col("unixtime").astype("timestamp"))

Date diff

In [None]:
(df
 .groupby("ip")
 .agg(f.min("timestamp").alias("begin"),
      f.max("timestamp").alias("end"))
 .select("ip", (f.datediff("end", "begin")).alias("days_cnt"))
)

### Window Functions

Compute aggregations over a speficic row while the number of rows of the original df remains unchanged

In [None]:
my_window = Window.partitionBy("ip").orderBy("unixtime")

df.select(
    "ip",
    "time",
    f.count("*").over(my_window).alias("cnt")
)

Examples of functions used with windows: `first`, `last`, `lag`, `lead`, `row_number`, `min`, `max`, `sum`

In [None]:
(df.select("ip", "unixtime",
          f.row_number().over(my_window).alias("count"),
          f.lag("unixtime").over(my_window).alias("lag"),
          f.lead("unixtime").over(my_window).alias("lead")
          )
   .select("ip", "unixtime",
          (f.col("lead") - f.col("unixtime")).alias("diff"))
   .where("diff >= 18000 or diff is NULL")
   .groupBy("ip").count()
)

A different way to use windows with time attributes

In [None]:
(df
 .selectExpr("country", "(users * counts) as total_count" , "ip")
 .groupBy(col("country"), window(col("SomeDate"), "1 day"))
 .sum("total_count")
 .show(5)
)

### Pivot table 

`pivot(column, [values])` is called between `groupBy` and `count`

In [None]:
df.groupBy("ip").pivot("url", top_url_list).count().fillna(0)
# Num. resulting of columns: len(top_url_list) + 1

### Working with HIVE

####  Create Hive views

In [None]:
df.createTempView("geoip")
# See view created
spark.catalog.listTables("web")

#### Create Hive tables

In [None]:
spark.sql("create table web.geoip as select from geoip")
# See table created
spark.catalog.listTables("web")

### Writting files

In [None]:
# Create hive table
df.write.saveAsTable("table_name",
                     mode='overwrite', # other options: error, append
                    )

In [None]:
# Save data in binary mode
df.write.save("table_name")

In [None]:
df.write.save("table_name", format='csv', mode='overwrite') # format='json'

In [None]:
df.write.parquet("table_name", mode='overwrite')

In [None]:
df.write.csv("table_name", mode='overwrite')

### Connecting to an external DB

In [None]:
connection_string = "jdbc:mysql://localhost:3306/demo?user=demo&password=demo"
# Write df
df.write.jdbc(connection_string, "geoip")
# Read df
new_df = spark.read.jdbc(connection_string, "geoip")