In [1]:
1+1

In [2]:
from pyspark.sql.functions import lit,col,concat, substring
NO_OF_ROWS = 100000
dfIds = spark.range(NO_OF_ROWS).withColumn("IDAsString", concat(lit("Str="), col("id")))
display(dfIds)

id,IDAsString
0,Str=0
1,Str=1
2,Str=2
3,Str=3
4,Str=4
5,Str=5
6,Str=6
7,Str=7
8,Str=8
9,Str=9


In [3]:
path = "/tmp/parquet/10MIDs"
dfIds.write.mode("Overwrite").parquet(path)

In [4]:
dfBeforeAppend = spark.read.parquet(path)
print("No Rows before append:", dfBeforeAppend.count())

In [5]:
%sql
-- DROP IF EXIST
DROP TABLE IF EXISTS IDS;
-- Skapa en tabell som pekar på PARQUET
CREATE TABLE IDS
    USING parquet
    OPTIONS (
      path "/tmp/parquet/10MIDs"
    )

In [6]:
%sql
SELECT COUNT(*) FROM IDS

count(1)
100000


In [7]:
# Append Rows
NO_APPEEND_NEW_ROWS = 10000
dfAppendRows = spark.range(NO_OF_ROWS + NO_APPEEND_NEW_ROWS).filter("id >= {rows}".format(rows=NO_OF_ROWS)).withColumn("IDAsString", f.concat(f.lit("Str="), f.col("id")))
dfAppendRows.write.mode("Append").parquet(path)
dfAfterAppend = spark.read.parquet(path)
print("NoROws after", dfAfterAppend.count())

In [8]:
%sql
SELECT COUNT(*) FROM IDS

count(1)
100000


In [9]:
%sql
REFRESH TABLE IDS

In [10]:
%sql
SELECT COUNT(*) FROM IDS

count(1)
110000


# Append data to partitions using parquet

In [12]:
# Create a col to partition on
# Just take last digit in id as partition id
dfId = spark.read.parquet(path)
dfId3 = dfId.withColumn("PartitionId", f.substring(f.col("id"), -1, 1))
# substring(column, -1, 1)
display(dfId3)

id,IDAsString,PartitionId
50000,Str=50000,0
50001,Str=50001,1
50002,Str=50002,2
50003,Str=50003,3
50004,Str=50004,4
50005,Str=50005,5
50006,Str=50006,6
50007,Str=50007,7
50008,Str=50008,8
50009,Str=50009,9


In [13]:
# Write IDS partitioned on last digit
pathPar = "/tmp/parquet/IDSPartitioned/"
dfId3.write.partitionBy("PartitionId").mode("Overwrite").parquet(pathPar)

In [14]:
%sql
DROP TABLE IF EXISTS IDPARTITIONED;
-- Create table on top of partitioned data
CREATE TABLE IDPARTITIONED (id long, IDAsString string, PartitionId int)
    USING parquet
    OPTIONS (
      path "/tmp/parquet/IDSPartitioned/"
    )
    partitioned by (PartitionId)
    -- When the table schema is not provided, schema and partition columns will be inferred

In [15]:
%sql
MSCK REPAIR TABLE IDPARTITIONED

In [16]:
%sql
select count(*), PartitionId from IDPARTITIONED group by PartitionId order by PartitionId


count(1),PartitionId
11000,0
11000,1
11000,2
11000,3
11000,4
11000,5
11000,6
11000,7
11000,8
11000,9


In [17]:
%fs
ls /mnt/datasetsneugen2/parquet/IDSPartitioned/

path,name,size
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/*/,*/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=0/,PartitionId=0/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=1/,PartitionId=1/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=2/,PartitionId=2/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=3/,PartitionId=3/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=4/,PartitionId=4/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=5/,PartitionId=5/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=6/,PartitionId=6/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=7/,PartitionId=7/,0
dbfs:/mnt/datasetsneugen2/parquet/IDSPartitioned/PartitionId=8/,PartitionId=8/,0


In [18]:
dfReadPar = spark.read.parquet(pathPar)
display(dfReadPar.groupBy("PartitionId").count())

PartitionId,count
1,11000
3,11000
5,11000
4,11000
2,11000
0,11000
9,11000
6,11000
8,11000
7,11000


In [19]:
# filter out ids with partition 0 and 1 and append to original parquet
dfFilter01 = dfId3.filter("PartitionId == 0 or PartitionId == 1")
# append these rows to original parquet
dfFilter01.write.partitionBy("PartitionId").mode("Append").parquet(pathPar)
# Test result
dfReadPar = spark.read.parquet(pathPar)
display(dfReadPar.groupBy("PartitionId").count())

PartitionId,count
6,11000
5,11000
9,11000
4,11000
8,11000
7,11000
2,11000
1,22000
3,11000
0,22000


In [20]:
%sql
-- check table
select count(*), PartitionId from IDPARTITIONED group by PartitionId order by PartitionId

count(1),PartitionId
11000,0
11000,1
11000,2
11000,3
11000,4
11000,5
11000,6
11000,7
11000,8
11000,9


In [21]:
%sql
MSCK REPAIR TABLE IDPARTITIONED;
select count(*), PartitionId from IDPARTITIONED group by PartitionId order by PartitionId
-- We should now be able to see the changes

count(1),PartitionId
22000,0
22000,1
11000,2
11000,3
11000,4
11000,5
11000,6
11000,7
11000,8
11000,9


#What if we tried to update specific rows?
Parquet does not allow updates. That means we would have some options:
- Rewritee all data
- Read exisitng data and create a new updated version of the data and rewrite all data
- FIgure out in what partition we want to change and do the above for only the partition

We have our data partitioned on the Partition ID.

In [23]:
%sql
select * from IDPARTITIONED where id < 30

id,IDAsString,PartitionId
3,Str=3,3
13,Str=13,3
23,Str=23,3
9,Str=9,9
19,Str=19,9
29,Str=29,9
2,Str=2,2
12,Str=12,2
22,Str=22,2
6,Str=6,6


In [24]:
# What if we would like to update the row where id = 50013 or id = 50033 and we want to add a string fixed to the IDAsString
# Both are in partition 3 so we can read the partion 3 only and write the update back

dfPartition = spark.sql("select * from IDPARTITIONED where PartitionId = 3")
dfPartition.show(5)

In [25]:
# Some logic for update
dfChange = dfPartition.filter("id = 50013 or id = 50033")
dfChange.show()

dfFixed = dfChange.withColumn("IDAsString", f.concat(f.col("IDAsString"), f.lit("_fixed")))
dfFixed.show()

In [26]:
# Take original data and put chnages in and write back
# Here dropping rows that should be updated and then adding by union. (Remember there is no update - a DF is immuatble)
# Get a new DF with the rows that should be updated
dfFiltered = dfPartition.filter("id != 50033 and id != 50013")
# Add the updated rows to a new DF
dfUpdatedDF = dfFiltered.union(dfFixed)
display(dfUpdatedDF.filter("id > 50012").orderBy("id"))

id,IDAsString,PartitionId
50013,Str=50013_fixed,3
50023,Str=50023,3
50033,Str=50033_fixed,3
50043,Str=50043,3
50053,Str=50053,3
50063,Str=50063,3
50073,Str=50073,3
50083,Str=50083,3
50093,Str=50093,3
50103,Str=50103,3


In [27]:
# Overwrite the correct parition with our new DF containing all data fro partion 3
dfUpdatedDF.write.mode("overwrite").parquet(pathPar + "PartitionId={partion_name}/".format(partion_name="3"))

In [28]:
# The rows in partitio 3 are updated in our final table.
display(spark.read.parquet(pathPar).filter("id < 50015 and id > 50011 or id < 50034 and id > 50032").orderBy("id"))

id,IDAsString,PartitionId
50012,Str=50012,2
50013,Str=50013_fixed,3
50014,Str=50014,4
50033,Str=50033_fixed,3


#### Note - It´s easier to append data as you can write append to a specific partition as we showed above. The updates are harder as you are forces to rewrite at least the full partition.

#### Note 2 - DELTA - By using the delta file format you can do upserts into different partitions much easier and not risking ovwerwriting your data by using time travel and ACID transactions.
Also table meta-data will be updated and in sync when using delta.