<h3 align="center"><b> PySpark SQL Test </b></h3>

---

- Taking a random dataset with ~800k rows
- Scaling it up to ~50m rows
- Spark SQL queries 
    - Value counts on each column (excl. key col)
    - Joining count columns into single table 
- Transferring from Spark DF to Pandas DF  

In [1]:
import re, pyspark
from time import time

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.types import IntegerType

print(pyspark.__version__)

3.2.2


In [2]:
conf = (SparkConf()
         .setMaster("local")
         .setAppName("dev")
         .set("spark.executor.memory", "8g")
         .set("spark.sql.shuffle.partitions", "2000")
         .set("spark.default.parallelism", "2000")
         )

sc = SparkContext(conf = conf)
spark = SparkSession(sparkContext=sc)

In [3]:
df = spark.read.csv('../data/random.csv', header=True, inferSchema=True)

print(f"Rows: {df.count()}")
print(f"Columns: {len(df.columns)}")
print("Schema:")
df.printSchema()

Rows: 819200
Columns: 6
Schema:
root
 |-- iter: integer (nullable = true)
 |-- t1: integer (nullable = true)
 |-- t2: integer (nullable = true)
 |-- t3: integer (nullable = true)
 |-- t4: integer (nullable = true)
 |-- t5: integer (nullable = true)



In [4]:
# Scale up DF
for _ in range(7):
    df = df.union(df)

print(f"Rows: {df.count()}")

Rows: 104857600


In [5]:
t = time()
df.createOrReplaceTempView('table')

iter_cols = [i for i in df.columns if re.match('t', i)]
col = iter_cols[0]

for col in iter_cols:
    if col == iter_cols[0]:
        q = spark.sql(f'''
            SELECT {col} as value, COUNT({col}) as {col} 
            FROM table 
            GROUP BY {col}
            ; ''')
    else:
        q.createOrReplaceTempView('query')
        q = spark.sql(f'''
            WITH tmp AS (SELECT {col} as value, COUNT({col}) as {col} 
            FROM table 
            GROUP BY {col}
            ) 
            SELECT COALESCE(query.value, tmp.value) as co_{col}, query.*, tmp.{col}
            FROM query
            FULL JOIN tmp ON query.value = tmp.value
            ; ''')

drop_cols = [col for col in q.columns if re.match('co_', col)]

# Unify key columns 
q = q.withColumn(
        'value', 
        F.coalesce(*[q[f'{col}'] for col in drop_cols]
    ).cast(IntegerType())
    ).drop(*drop_cols)

q.orderBy('value').show(truncate=False)
print(f'''Run time : {round(time()-t, 1)} s''')

+-----+-------+-------+-------+-------+-------+
|value|t1     |t2     |t3     |t4     |t5     |
+-----+-------+-------+-------+-------+-------+
|1    |1064960|null   |null   |null   |null   |
|2    |991232 |1064960|null   |null   |null   |
|3    |958464 |991232 |1064960|null   |null   |
|4    |1130496|958464 |991232 |1064960|null   |
|5    |1040384|1130496|958464 |991232 |1064960|
|6    |958464 |1040384|1130496|958464 |991232 |
|7    |1212416|958464 |1040384|1130496|958464 |
|8    |1032192|1212416|958464 |1040384|1130496|
|9    |983040 |1032192|1212416|958464 |1040384|
|10   |1146880|983040 |1032192|1212416|958464 |
|11   |1040384|1146880|983040 |1032192|1212416|
|12   |1064960|1040384|1146880|983040 |1032192|
|13   |999424 |1064960|1040384|1146880|983040 |
|14   |1179648|999424 |1064960|1040384|1146880|
|15   |1015808|1179648|999424 |1064960|1040384|
|16   |933888 |1015808|1179648|999424 |1064960|
|17   |1081344|933888 |1015808|1179648|999424 |
|18   |1089536|1081344|933888 |1015808|1

In [6]:
t = time()
iter_cols = [i for i in df.columns if re.match('t', i)]
col = iter_cols[0]

for col in iter_cols:
    if col == iter_cols[0]:
        q = df.groupBy(col).count().withColumnRenamed(col, 'value').withColumnRenamed('count', col)
    else:
        temp = df.groupBy(col).count().withColumnRenamed(col, 'value').withColumnRenamed('count', col)
        q = q.join(temp, how='outer', on='value')
        q = q.withColumn('value', F.coalesce(temp['value'], q['value']).cast(IntegerType()))

q.orderBy('value').show()
print(f'''Run time : {round(time()-t, 1)} s''')

+-----+-------+-------+-------+-------+-------+
|value|     t1|     t2|     t3|     t4|     t5|
+-----+-------+-------+-------+-------+-------+
|    1|1064960|   null|   null|   null|   null|
|    2| 991232|1064960|   null|   null|   null|
|    3| 958464| 991232|1064960|   null|   null|
|    4|1130496| 958464| 991232|1064960|   null|
|    5|1040384|1130496| 958464| 991232|1064960|
|    6| 958464|1040384|1130496| 958464| 991232|
|    7|1212416| 958464|1040384|1130496| 958464|
|    8|1032192|1212416| 958464|1040384|1130496|
|    9| 983040|1032192|1212416| 958464|1040384|
|   10|1146880| 983040|1032192|1212416| 958464|
|   11|1040384|1146880| 983040|1032192|1212416|
|   12|1064960|1040384|1146880| 983040|1032192|
|   13| 999424|1064960|1040384|1146880| 983040|
|   14|1179648| 999424|1064960|1040384|1146880|
|   15|1015808|1179648| 999424|1064960|1040384|
|   16| 933888|1015808|1179648| 999424|1064960|
|   17|1081344| 933888|1015808|1179648| 999424|
|   18|1089536|1081344| 933888|1015808|1

In [7]:
spark.stop()