### Merging Data

In [44]:
df = spark.read.table("MtoMActual")
dfdates = spark.read.table("MtoMActualWithDates")
dfadditional = spark.read.table("MtoMActualAdditional")
dfunion = df.union(dfadditional) # union in pyspark won't remove duplicates
display(dfunion)
dfunion.write.mode("overwrite").format("delta").saveAsTable("MtoMActualCombined")

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 51, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, ef6222f1-70bb-4b1e-8da6-4c01c6bfd8e1)

In [45]:
dfDistinctUnion = df.union(dfadditional).distinct() # remove duplicates
display(dfDistinctUnion)

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 52, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2b632e59-6f8e-4151-8978-2b16ff9e49d3)

In [46]:
%%sql
SELECT *
FROM MtoMActual
UNION ALL
-- UNION in SQL usually remove duplicates
SELECT *
FROM MtoMActualAdditional

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 53, Finished, Available, Finished)

<Spark SQL result set with 9 rows and 3 fields>

In [47]:
%%sql
SELECT Country, Location, Actual, NULL as ColDate
FROM MtoMActual
UNION ALL
-- UNION in SQL usually remove duplicates
SELECT *
FROM MtoMActualWithDates;

SELECT *
FROM MtoMActual
UNION ALL
-- UNION in SQL usually remove duplicates
SELECT Country, Location, Actual 
FROM MtoMActualWithDates;

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 55, Finished, Available, Finished)

<Spark SQL result set with 12 rows and 4 fields>

<Spark SQL result set with 12 rows and 3 fields>

In [48]:
dfDatesUnion = dfdates.unionByName(dfadditional, allowMissingColumns=True)
display(dfDatesUnion)


StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 56, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, fdf88c97-e046-47d1-8f8d-c925bb3dde7c)

### Identify and resolve duplicate data

In [49]:
df = spark.read.table("MtoMActualCombined")
display(df.groupBy("Country","Location","Actual").count().where("count > 1"))

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 57, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, eb4e89ff-ca79-43de-a559-a971163ed79b)

In [50]:
%%sql
SELECT Country, Location, Actual
FROM MtoMActualCombined
GROUP BY Country, Location, Actual
HAVING count(*) > 1

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 58, Finished, Available, Finished)

<Spark SQL result set with 1 rows and 3 fields>

In [51]:
%%sql
-- SQL 中可以这么去重
SELECT DISTINCT Country, Location, Actual
FROM MtoMActualCombined;

SELECT Country, Location, Actual
FROM MtoMActualCombined
GROUP BY Country, Location, Actual;

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 60, Finished, Available, Finished)

<Spark SQL result set with 8 rows and 3 fields>

<Spark SQL result set with 8 rows and 3 fields>

In [52]:
display(df.distinct()) 
display(df.dropDuplicates(["Country", "Location"])) 
display(df)

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 61, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0604be89-bdfe-43fa-8403-8411b01bf4cc)

SynapseWidget(Synapse.DataFrame, 246bd901-8cc9-445d-bfc3-295e8e8e3be9)

SynapseWidget(Synapse.DataFrame, 1010caa8-bcc7-4d62-91b9-c8563b7cf4aa)

### Join Data using

In [53]:
dftarget = spark.read.format("csv").option("header", "true").load("Files/MtoMTarget.csv")
dftarget.write.mode("overwrite").format("delta").saveAsTable("MtoMTarget")

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 62, Finished, Available, Finished)

In [54]:
dfactual = spark.read.table("MtoMActual")
dftarget = spark.read.table("MtoMTarget")
display(dfactual)
display(dftarget)

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 63, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 914c35ba-ceb4-4764-9cbb-b449dfd8756e)

SynapseWidget(Synapse.DataFrame, 2e7bd876-7708-4ab7-b1e8-d8df1053a3e0)

In [55]:
dfactual = dfactual.select(dfactual.Country, dfactual.Actual.cast('int')) \
                   .groupBy("Country").sum("Actual").withColumnRenamed("sum(Actual)", "ActualTotoal")
display(dfactual)
dftarget = dftarget.select(dftarget.Country, dftarget.Target.cast('int')) \
                   .groupBy("Country").sum("Target").withColumnRenamed("sum(Target)", "TargetTotoal")
display(dftarget)

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 64, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, bf574b95-3469-4319-8ad5-8835ac5f109c)

SynapseWidget(Synapse.DataFrame, 8669a435-2795-46fa-976d-3012b2ebb6c9)

In [56]:
dfactual.write.mode("overwrite").format("delta").saveAsTable("MtoActualSum")
dftarget.write.mode("overwrite").format("delta").saveAsTable("MtoTargetSum")

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 65, Finished, Available, Finished)

In [57]:
display(dfactual.join(dftarget, dfactual.Country == dftarget.Country)) # inner join
display(dfactual.join(dftarget, dfactual.Country == dftarget.Country, "left")) # left join
display(dfactual.join(dftarget, dfactual.Country == dftarget.Country, "right")) # right join

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 66, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b1fbe232-ac59-4c77-b2b2-7387507d51e8)

SynapseWidget(Synapse.DataFrame, 7f414318-bedd-4fea-a6c2-dedbb5c3be0e)

SynapseWidget(Synapse.DataFrame, a10ba9be-275f-4413-8236-20764a6e5755)

In [68]:
dfactualrenamed = dfactual.withColumnRenamed("Country", "ActualCountry")
dftargetrenamed = dftarget.withColumnRenamed("Country", "TargetCountry")
dfjoin = dfactualrenamed.join(dftargetrenamed, dfactualrenamed.ActualCountry == dftargetrenamed.TargetCountry, "full") # full join
display(dfjoin)


StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 77, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 97ace281-11a8-4620-957d-4b22eb01cffd)

In [None]:
dfjoin.write.mode("overwrite").format("delta").saveAsTable("MtoMJoin")

In [59]:
%%sql
SELECT MtoActualSum.Country AS ActualCountry, ActualTotoal,
       MtoTargetSum.Country AS TargetCountry, TargetTotoal
FROM MtoActualSum
-- JOIN MtoTargetSum
-- LEFT JOIN MtoTargetSum
-- RIGHT JOIN MtoTargetSum
FULL JOIN MtoTargetSum
ON MtoActualSum.Country = MtoTargetSum.Country


StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 68, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 4 fields>

### Identify missing value data or null values

In [66]:
dfactual = spark.read.table("MtoMActual")
dftarget = spark.read.table("MtoMTarget")
display(dfactual)
display(dftarget)

dfRightJoin = dfactual.join(dftarget, "Country", "right")
# 这里就是join之后只保留一个Country,
# dfRightJoin = dfactual.join(dftarget, dfactual.Country == dftarget.Country, "right")
# 这样就会保留两个Country

display(dfRightJoin.where(dfRightJoin.Actual.isNull()))

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 75, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 0cf7ecbc-0e66-4f62-b8d9-315c5dd549e1)

SynapseWidget(Synapse.DataFrame, 5991a89d-026a-4689-8fbc-2035742b4a0d)

SynapseWidget(Synapse.DataFrame, 1229375b-e4f7-42d0-9b86-6a4bfd3088d4)

In [72]:
display(dfjoin)
dfjoin = dfjoin.fillna({"ActualCountry": "(No Actual)"})
dfjoin = dfjoin.fillna({"TargetCountry": "(No Target)"})
dfjoin = dfjoin.select(dfjoin.ActualCountry, dfjoin.Location, 
                       dfjoin.Actual.cast("int"),
                       dfjoin.TargetCountry, dfjoin.Type,
                       dfjoin.Target.cast("int"))
dfjoin = dfjoin.na.fill(0) # 这里注意的是 只会把type为数字类型的替换为0，其他不会替换
display(dfjoin)

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 81, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1b6c2c5a-6889-4320-9e6d-34ecdaaaff59)

SynapseWidget(Synapse.DataFrame, 88d5b637-0da1-415b-bf18-35f9d7d5db8f)

In [76]:
%%sql
SELECT IFNULL(Mtoactualsum.Country, "(No Actual)") AS ActualCountry,
       COALESCE(ActualTotoal, 0) AS ActualTotoal,
       IFNULL(Mtotargetsum.Country, "(No Target)") AS TargetCountry,
       COALESCE(TargetTotoal, 0) AS TargetTotoal
FROM Mtoactualsum
FULL JOIN Mtotargetsum
ON Mtoactualsum.Country = Mtotargetsum.Country

StatementMeta(, 51a658ec-c51b-4111-8c96-309e6f65b696, 85, Finished, Available, Finished)

<Spark SQL result set with 4 rows and 4 fields>