In [0]:
import pyspark
import os
import pandas
import distutils
import csv
import sqlite3
from sqlalchemy import create_engine


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.types import *
from functools import reduce

In [0]:
spark=SparkSession.builder.appName("JEH").getOrCreate()

In [0]:
raw_df=spark.read.option("header",True).option("escape",'"').option("multiLine",True).csv("dbfs:/FileStore/shared_uploads/valarmathy.v@blackstraw.ai/large_array_data.csv")

In [0]:
raw_df.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/delta/large_array_file/bronze/array_raw")
spark.sql("CREATE SCHEMA IF NOT EXISTS bronze")
spark.sql("create table if not exists bronze.array_raw using delta location 'dbfs:/FileStore/tables/delta/large_array_file/bronze/array_raw'")

Out[45]: DataFrame[]

In [0]:
# TRANFORMATION LOGICS 1
array_cols = []
for c in raw_df.columns:

    sorted_df=raw_df.orderBy(col(c).asc())

    filtered_df=sorted_df.filter(col(c).startswith("[") & col(c).endswith("]"))

    if filtered_df.count()>0:
        array_cols.append(c)
print(array_cols)
print(len(array_cols))


['message_follower_ids']
1


In [0]:
# TRANFORMATION LOGICS 2
tmpdf=raw_df

final_columns = []
for c in tmpdf.columns:
    
    if c not in array_cols:
        final_columns.append(col(c))
    else:
        # Step 2: Appending original col
        final_columns.append(col(c)) 
        # Step 3: creatimng tmp col to use for conversion of str to arr & splitting data
        tmpcol_name=f"{c}_tmp"
        
        tmpdf = tmpdf.withColumn(tmpcol_name,from_json(col(c), ArrayType(StringType())))

        max_len = tmpdf.select(size(col(tmpcol_name)).alias("len")).agg(F.max("len")).collect()[0][0]

        for i in range(max_len):
            final_columns.append(col(tmpcol_name).getItem(i).alias(f"{c}_{i+1}"))
        
# Step 4: selecting final cols
finaldf = tmpdf.select(*final_columns)


In [0]:
finaldf.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/delta/large_array_file/silver/array_splitted_data")
spark.sql("create schema if not exists silver")
spark.sql("create table if not exists silver.array_splitted_data using delta location 'dbfs:/FileStore/tables/delta/large_array_file/silver/array_splitted_data'")

Out[48]: DataFrame[]

In [0]:

finaldf.write.mode("overwrite").option("header", True).option("sep", ",").csv("dbfs:/FileStore/tables/delta/large_array_file/silver/array_splitted_data.csv")


In [0]:
%sql
SELECT 
  name, 
  sum(
  CASE 
    WHEN message_follower_ids IS NULL OR message_follower_ids = '[]' THEN 0
    ELSE SIZE(SPLIT(REGEXP_REPLACE(message_follower_ids, '[\\[\\]\\s]', ''), ',')) 
  END) AS Follower_count
FROM silver.array_splitted_data
group by name


name,Follower_count
User_70,246
User_44,270
User_38,206
User_55,266
User_81,233
User_10,248
User_83,281
User_9,207
User_98,355
User_85,260


Databricks visualization. Run in Databricks to view.

In [0]:
# Saving business use data into gold layer
user_follower_count=spark.sql("""SELECT 
  name, 
  sum(
  CASE 
    WHEN message_follower_ids IS NULL OR message_follower_ids = '[]' THEN 0
    ELSE SIZE(SPLIT(REGEXP_REPLACE(message_follower_ids, '[]s]', ''), ',')) 
  END) AS Follower_count
FROM silver.array_splitted_data
group by name""")
display(user_follower_count)

user_follower_count.write.format("delta").mode("overwrite").save("dbfs:/FileStore/tables/delta/large_array_file/gold/user_follower_count")
spark.sql("create schema if not exists gold")
spark.sql("create table if not exists gold.user_follower_count using delta location 'dbfs:/FileStore/tables/delta/large_array_file/gold/user_follower_count'")

name,Follower_count
User_70,246
User_44,270
User_38,206
User_55,266
User_81,233
User_10,248
User_83,281
User_9,207
User_98,355
User_85,260


Out[51]: DataFrame[]

In [0]:
%sql
-- describe history of a table
-- DESCRIBE HISTORY silver.array_splitted_data;
-- showtables in schema
-- SHOW TABLES in gold;
-- drop table
-- drop table if exists "gold.user_follower_count"
-- remove registered table
-- dbutils.fs.rm("dbfs:/FileStore/tables/delta/gold.user_follower_count", recurse=True)
