#Getting Started with PySpark in Google Colab

PySpark is Python interface for Apache Spark. The primary use cases for PySpark are to work with huge amounts of data and for creating data pipelines.

You don't need to work with big data to benefit from PySpark. I find that the SparkSQL is a great tool for performing routine data anlysis. Pandas can get slow and you may find yourself writing a lot of code for data cleaning whereas the same actions take much less code in SQL. Let's get started!

See more here! http://spark.apache.org/docs/latest/api/python/

# 1. Installing PySpark in Google Colab

In [None]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("PySpark Book Ch. 2") \
       .getOrCreate()

spark

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,665 kB]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://r2u.stat.illinois.edu/ubuntu jammy/main all Packages [8,926 kB]
Get:9 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Get:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:11 http://security.ubuntu.com/ubuntu jammy-security/universe amd64 Packages [1,245 kB]
Hit:12 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy 

In [None]:
spark

# 2. Reading Data

For this example, I am going to use a publicly available data set in a CSV format.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# import requests
# path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"
# req = requests.get(path)
# url_content = req.content

# csv_file_name = 'owid-covid-data.csv'
# csv_file = open(csv_file_name, 'wb')

# csv_file.write(url_content)
# csv_file.close()
file_path = '/content/drive/MyDrive/Colab_Notebooks/PySpark Data Analysis/Advanced Analytics With PySpark - OReilly/donation/'

df = spark.read.csv(file_path+'block_*.csv', header=True, inferSchema=True)

#3. PySpark DataFrames

In [None]:
#Viewing the dataframe schema
df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: boolean (nullable = true)



In [None]:
df.show()

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|14055|94934|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|33948|34740|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|  946|71870|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|64880|71676|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|    true|
|25739|45991|           1|           ?|         1.0|           ?|      1|     1|     1|     1|      1|  

In [None]:
df = df.replace('?','')
df.show()

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|14055|94934|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|33948|34740|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|  946|71870|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|64880|71676|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|25739|45991|           1|            |         1.0|            |      1|     1|     1|     1|      1|  

In [None]:
df.printSchema()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: boolean (nullable = true)



In [None]:
df.count()

5749132

In [None]:
#Summary stats
df.describe().show()

+-------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|summary|              id_1|              id_2|      cmp_fname_c1|      cmp_fname_c2|       cmp_lname_c1|       cmp_lname_c2|            cmp_sex|             cmp_bd|             cmp_bm|            cmp_by|            cmp_plz|
+-------+------------------+------------------+------------------+------------------+-------------------+-------------------+-------------------+-------------------+-------------------+------------------+-------------------+
|  count|           5749132|           5749132|           5749132|           5749132|            5749132|            5749132|            5749132|            5749132|            5749132|           5749132|            5749132|
|   mean| 33324.48559643438| 66587.43558331935|0.7129024704425707| 0.900017671890335|0.3156278193076

In [None]:
df.cache()

DataFrame[id_1: int, id_2: int, cmp_fname_c1: string, cmp_fname_c2: string, cmp_lname_c1: double, cmp_lname_c2: string, cmp_sex: int, cmp_bd: string, cmp_bm: string, cmp_by: string, cmp_plz: string, is_match: boolean]

Let's get the value counts for the `is_match` column...

In [None]:
df.groupBy('is_match').count().show()

+--------+-------+
|is_match|  count|
+--------+-------+
|    true|  20931|
|   false|5728201|
+--------+-------+



In [None]:
df.agg(F.avg('cmp_sex'),F.stddev('cmp_sex')).show()

+-----------------+-------------------+
|     avg(cmp_sex)|    stddev(cmp_sex)|
+-----------------+-------------------+
|0.955001381078048|0.20730111116897443|
+-----------------+-------------------+



# 4. Spark SQL

What I really like about the SQL module is that it's very approachable to interact with your data while still using Spark. There is less to learn since it's basically the same SQL syntax you might already be comfortable with.

In [None]:
#Creating a table from the dataframe
df.createOrReplaceTempView("some_random_data") #temporary view
# df.saveAsTable("covid_data") #Save as a table
# df.write.mode("overwrite").saveAsTable("covid_data") #Save as table and overwrite table if exits

In [None]:
df2 = spark.sql("SELECT * from some_random_data WHERE is_match='true'")
df2.printSchema()
df2.show()

root
 |-- id_1: integer (nullable = true)
 |-- id_2: integer (nullable = true)
 |-- cmp_fname_c1: string (nullable = true)
 |-- cmp_fname_c2: string (nullable = true)
 |-- cmp_lname_c1: double (nullable = true)
 |-- cmp_lname_c2: string (nullable = true)
 |-- cmp_sex: integer (nullable = true)
 |-- cmp_bd: string (nullable = true)
 |-- cmp_bm: string (nullable = true)
 |-- cmp_by: string (nullable = true)
 |-- cmp_plz: string (nullable = true)
 |-- is_match: boolean (nullable = true)

+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| id_1| id_2|cmp_fname_c1|cmp_fname_c2|cmp_lname_c1|cmp_lname_c2|cmp_sex|cmp_bd|cmp_bm|cmp_by|cmp_plz|is_match|
+-----+-----+------------+------------+------------+------------+-------+------+------+------+-------+--------+
| 3148| 8326|           1|            |         1.0|            |      1|     1|     1|     1|      1|    true|
|14055|94934|           1|            |         1.0|          

# 5. Example with Another Data Set
This data set comes with your Google Colab Session

In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("camnugent/california-housing-prices")
print("Path to dataset files:", path)

Downloading from https://www.kaggle.com/api/v1/datasets/download/camnugent/california-housing-prices?dataset_version_number=1...


100%|██████████| 400k/400k [00:00<00:00, 45.8MB/s]

Extracting files...
Path to dataset files: /root/.cache/kagglehub/datasets/camnugent/california-housing-prices/versions/1





In [None]:
df = spark.read.csv(path, header=True, inferSchema=True)

In [None]:
# df = spark.read.csv("/content/sample_data/california_housing_train.csv", header=True, inferSchema=True)

In [None]:
df.printSchema()

root
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- housing_median_age: double (nullable = true)
 |-- total_rooms: double (nullable = true)
 |-- total_bedrooms: double (nullable = true)
 |-- population: double (nullable = true)
 |-- households: double (nullable = true)
 |-- median_income: double (nullable = true)
 |-- median_house_value: double (nullable = true)
 |-- ocean_proximity: string (nullable = true)



In [None]:
#print N rows
df.show(5)

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|ocean_proximity|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+---------------+
|  -122.23|   37.88|              41.0|      880.0|         129.0|     322.0|     126.0|       8.3252|          452600.0|       NEAR BAY|
|  -122.22|   37.86|              21.0|     7099.0|        1106.0|    2401.0|    1138.0|       8.3014|          358500.0|       NEAR BAY|
|  -122.24|   37.85|              52.0|     1467.0|         190.0|     496.0|     177.0|       7.2574|          352100.0|       NEAR BAY|
|  -122.25|   37.85|              52.0|     1274.0|         235.0|     558.0|     219.0|       5.6431|          341300.0|       NEAR BAY|
|  -122.25|   37.85|              

In [None]:
df.count()

20640

In [None]:
df.groupBy('ocean_proximity').agg(F.mean('median_house_value')).show()

+---------------+-----------------------+
|ocean_proximity|avg(median_house_value)|
+---------------+-----------------------+
|         ISLAND|               380440.0|
|     NEAR OCEAN|     249433.97742663656|
|       NEAR BAY|     259212.31179039303|
|      <1H OCEAN|     240084.28546409807|
|         INLAND|     124805.39200122119|
+---------------+-----------------------+



In [None]:
df.select("housing_median_age","total_rooms").show(5)

+------------------+-----------+
|housing_median_age|total_rooms|
+------------------+-----------+
|              41.0|      880.0|
|              21.0|     7099.0|
|              52.0|     1467.0|
|              52.0|     1274.0|
|              52.0|     1627.0|
+------------------+-----------+
only showing top 5 rows



In [None]:
df.describe().show()

+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|summary|          longitude|         latitude|housing_median_age|       total_rooms|    total_bedrooms|        population|       households|     median_income|median_house_value|ocean_proximity|
+-------+-------------------+-----------------+------------------+------------------+------------------+------------------+-----------------+------------------+------------------+---------------+
|  count|              20640|            20640|             20640|             20640|             20433|             20640|            20640|             20640|             20640|          20640|
|   mean|-119.56970445736148| 35.6318614341087|28.639486434108527|2635.7630813953488| 537.8705525375618|1425.4767441860465|499.5396802325581|3.8706710029070246|206855.81690891474|           NULL|
| stddev|  2.0035317

In [None]:
df.select('total_rooms').distinct().show()

+-----------+
|total_rooms|
+-----------+
|      769.0|
|     2862.0|
|     3980.0|
|     1761.0|
|      692.0|
|     4800.0|
|      496.0|
|      934.0|
|     2734.0|
|     1051.0|
|      299.0|
|     2815.0|
|     4066.0|
|     7554.0|
|     5776.0|
|      305.0|
|     6433.0|
|    12467.0|
|     5983.0|
|     4142.0|
+-----------+
only showing top 20 rows



In [None]:
test = df.groupBy('total_rooms').agg(F.sum('housing_median_age'))

In [None]:
test.toPandas()

In [None]:
#Counting and removing missing values

df.select([F.count(F.when(F.isnull(c), c)).alias(c) for c in df.columns]).show()

# Creating a Test Spark DataFrame

In [None]:
data = [
        ('John','Smith',1),
        ('Jane','Smith',2),
        ('Jonas','Smith',3),
]

# columns = ["firstname","middlename","lastname","dob","gender","salary"]
columns = ["firstname","lastname","id"]
df = spark.createDataFrame(data=data, schema = columns)

In [None]:
df

# Spark Tips and Tricks

This is a collection of code snippets for common or tricky tasks

##Pandas DataFrame to Spark DataFrame

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(np.random.randint(100,size=(1000, 3)),columns=['A','B','C'])
spark_df = spark.createDataFrame(df)
spark_df.show()

In [None]:
#Convert Object columns in pandas dataframe to a string
for i in df.select_dtypes(include='object').columns.tolist():
	df[i] = df[i].astype(str)

#Convert datetimes to UTC
for i in [col for col in df.columns if df[col].dtype == 'datetime64[ns]']:
  df[i] = pd.to_datetime(df[i], utc=True)

#Replace nan and "None" in pandas dataframe to null in the spark dataframe
spark_df = spark.createDataFrame(df).replace('None', None).replace(float('nan'), None)

##Window Functions

In [None]:
data = [
        (1,'2021-01-01 10:00:00'),
        (1,'2021-01-01 11:00:00'),
        (1,'2021-01-01 12:00:00'),
        (2,'2021-01-01 12:00:00'),
        (2,'2021-01-01 13:00:00'),
        (2,'2021-01-01 14:00:00'),
]

columns = ["id","datetime"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

In [None]:
#Selecting the min and max by a specific Group
spark.sql('''
Select
  id,

  max(datetime) OVER (Partition BY id ORDER BY datetime) as max_date,
  min(datetime) OVER (Partition BY id ORDER BY datetime) as min_date,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

In [None]:
# Selecting the row number or order rank for each row within a specified grouping.
# This is great for sub rankings in a table

spark.sql('''
Select
  id,
  datetime,

  ROW_NUMBER() OVER (Partition BY id ORDER BY datetime) as row_number

  FROM window_test

''').show()

## De-duplicate data by returning the most recently updated row using a window function

In [None]:
data = [
        (1,'2021-01-01',100,'A'),
        (1,'2021-01-31',105,'A'),
        (2,'2021-02-04',160,'B'),
        (2,'2021-02-07',145,'B'),
]

columns = ["id","date","score","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

In [None]:
df2 = spark.sql("""
WITH T AS (
  SELECT
  *,
  ROW_NUMBER() OVER (PARTITION BY id ORDER BY date DESC) AS version_number
  FROM window_test
)

SELECT * FROM T WHERE version_number = 1;

""")

df2.show()

In [None]:
spark.sql("""
  SELECT
  *,
  SUM(score) OVER (PARTITION by type ORDER BY date) as score_cumulative
  FROM window_test

""").show()

## Limit the number of results per group window function

In [None]:
import pandas as pd
import numpy as np

df = pd.DataFrame(
np.hstack((
    np.random.randint(1,5,size=(100000, 1)),
    np.random.randint(100,size=(100000, 1))
))
, columns=['company_id', 'number'])

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_test_limits")


In [None]:
spark.sql("""
WITH T AS (
  SELECT
    company_id,
    number,
    ROW_NUMBER() OVER (PARTITION BY company_id ORDER BY number) AS row_number
  FROM window_test_limits
    )

SELECT * FROM T WHERE row_number <= 100

""").show()

## Calculate a 7 day moving average

In [None]:
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['date'])
import random
df['company_id'] = 1
df['number'] = df.apply(lambda x: random.randint(0,100), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("window_data")

dff.show()

In [None]:
spark.sql("""
SELECT
  date,
  company_id,
  number,
  AVG(number) OVER (PARTITION BY company_id ORDER BY date ASC RANGE BETWEEN INTERVAL 6 DAYS PRECEDING AND CURRENT ROW) as last_7_day_avg
FROM window_data
""").show()

## Monthly Active Users

In [None]:
import pandas as pd
df = pd.DataFrame(pd.date_range('1/1/2022','1/31/2022',freq='D'), columns=['login_date'])
import random
df['company_id'] = 1
df['user_id'] = df.apply(lambda x: random.randint(0,3), axis = 1)

dff = spark.createDataFrame(df)
dff.createOrReplaceTempView("users_data")

dff.show()

In [None]:
#Revisit this transform
spark.sql("""
SELECT
  login_date,
  COUNT(user_id) OVER (PARTITION BY login_date ORDER BY login_date ASC RANGE BETWEEN INTERVAL 30 DAYS PRECEDING AND CURRENT ROW) AS monthly_active_users
  FROM users_data
""").show()

## Find the time difference between related rows using a window function

In [None]:
data = [
        (1,'start','2021-01-01',100,'A'),
        (1,'end','2021-01-31',200,'A'),
        (2,'start','2021-03-05 4:53:11',100,'A'),
        (2,'end','2021-05-01 05:06:38',200,'A'),
]

columns = ["id","session","datetime","station_return","type"]
df = spark.createDataFrame(data=data, schema = columns)
df.createOrReplaceTempView("window_test")
df.show()

In [None]:
spark.sql('''
SELECT
  id,
  datetime,
  lead(datetime) OVER (PARTITION BY id ORDER BY datetime) as next_datetime,
  DATEDIFF(lead(datetime) OVER (PARTITION BY id ORDER BY datetime),datetime) as duration_in_days

FROM window_test

''').show()

## Unpivotting

In [None]:
from pyspark.sql.types import *


data = [
        ('tim', 10, 9, 8, 5),
        ('john', 5, 6, 3, 6),
        ('jane', 7, 8, 9, 10),

]

schema = StructType([
   StructField("name", StringType(), True),
   StructField("experience", IntegerType(), True),
   StructField("satisfaction", IntegerType(), True),
   StructField("customer_service", IntegerType(), True),
   StructField("speed_of_service", IntegerType(), True)])


df = spark.createDataFrame(data, schema=schema)

df.show()

In [None]:
cols = ['experience', 'satisfaction', 'customer_service', 'speed_of_service']

exprs = f"""stack({len(cols)}, {", ".join([f"'{i}',{i}" for i in cols])}) as (question,score)"""

unpivotted_df = df.select("name",F.expr(exprs))

unpivotted_df.show()

## Replace Values using a Dictionary

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',3),
        (2, 'hello',5),
        (3, 'hello',5),
        (135246, 'hello',4),
        (54936, 'hello',4)
        ],
        ["id", "text","num"]))

In [None]:
mapping = {
1: 5555,
4:9999
}

In [None]:
df.replace(mapping,1,'id').replace(mapping,1,'_3').show()

##Create a Date Range

In [None]:
date_range_df = spark.sql("SELECT explode(sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 day)) as date")
date_range_df.show()

##Concat Row Values after Grouping

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',3),
        (2, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        ],
        ["id", "text"]))

df.createOrReplaceTempView("group_array")

df.show()

In [None]:
#Return every element
spark.sql("Select g.text, collect_list(g.id) FROM group_array as g GROUP BY 1").show()

In [None]:
#Return unique list
spark.sql("Select g.text, collect_set(g.id) FROM group_array as g GROUP BY 1").show()

## Rename Spark Columns with a Dictionary

In [None]:
col_dict = {
    'id':'ID',
    'test':'hello'
}

#Select only specific columns from a file
df = spark.read.parquet(path).select([k for k in cols_2016])

#rename the columns
for old_name, new_name in col_dict.items():
  df = df.withColumnRenamed(old_name,new_name)

df.createOrReplaceTempView("test")

test.show()

##Read Multiple Parquet Files into one Spark DataFrame

In [None]:
import glob

parquet_files = glob.glob('/content/*.parquet')
#The * is a wild card

df = spark.read.parquet(*parquet_files)

## Split and get last element in Spark SQL

In [None]:
spark.sql("""
SELECT
  "This.is.a.test" AS text,
  SPLIT("This.is.a.test",'[\.]') AS split,
  REVERSE(SPLIT("This.is.a.test",'[\.]'))[0] AS last_word
""").show()

##Handling NULL Values

In [None]:
df = (spark
    .createDataFrame([
        (1, 'hello',None),
        (2, 'hello',None),
        (3, 'hello',5),
        (3, 'hello',5),
        (3, 'hello',5),
        ],
        ["id", "text"]))

df.createOrReplaceTempView("group_array")

df.show()

In [None]:
spark.sql("Select * from group_array where _3 IS NOT NULL").show()

## Using a JDBC Driver

In [None]:
!pip install JayDeBeApi
import jaydebeapi
import os

#Downlaods JDBC drivers
!wget https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/2.3.7/hive-jdbc-2.3.7-standalone.jar
!zip -q -d hive-jdbc-2.3.7-standalone.jar org/apache/logging/log4j/core/lookup/JndiLookup.class
!unzip hive-jdbc-2.3.7-standalone.jar > output.txt

!wget https://github.com/timveil/hive-jdbc-uber-jar/releases/download/v1.8-2.6.3/hive-jdbc-uber-2.6.3.0-235.jar


DRIVER_CLASS = 'org.apache.hive.jdbc.HiveDriver'
DRIVER_PATH = 'hive-jdbc-2.3.7-standalone.jar'
ASCEND_ENV = 'trial'
CONN_URL = 'jdbc:'

user = 'admin'
pw = 'admin'

conn = jdbc.connect(DRIVER_CLASS,
                    CONN_URL,
                    [user, pw],
                    DRIVER_PATH)

# Regex

In [None]:
spark.sql("""
SELECT
  '(5) Strongly Agree',
  regexp_extract('(10) Strongly Agree', '([0-9]+)')
""").show()