In [2]:
# Import the necessary libraries
from optimus import Optimus
from pyspark.sql.functions import *
import pandas as pd
import numpy as np
op = Optimus()

# Create sample data

In [3]:
raw_data_1 = {
        'subject_id': ['1', '2', '3', '4', '5'],
        'first_name': ['Alex', 'Amy', 'Allen', 'Alice', 'Ayoung'], 
        'last_name': ['Anderson', 'Ackerman', 'Ali', 'Aoni', 'Atiches']}

raw_data_2 = {
        'subject_id': ['4', '5', '6', '7', '8'],
        'first_name': ['Billy', 'Brian', 'Bran', 'Bryce', 'Betty'], 
        'last_name': ['Bonder', 'Black', 'Balwner', 'Brice', 'Btisan']}

raw_data_3 = {
        'subject_id': ['1', '2', '3', '4', '5', '7', '8', '9', '10', '11'],
        'test_id': [51, 15, 15, 61, 16, 14, 15, 1, 61, 16]}



In [4]:
data1_pd = pd.DataFrame(raw_data_1, columns = ['subject_id', 'first_name', 'last_name'])
data2_pd = pd.DataFrame(raw_data_2, columns = ['subject_id', 'first_name', 'last_name'])
data3_pd = pd.DataFrame(raw_data_3, columns = ['subject_id','test_id'])

# Create Spark DF

In [5]:
data_1 = op.spark.createDataFrame(data1_pd)
data_2 = op.spark.createDataFrame(data2_pd)
data_3 = op.spark.createDataFrame(data3_pd)

In [6]:
data_1.table(3)

subject_id  1 (string)  nullable,first_name  2 (string)  nullable,last_name  3 (string)  nullable
1,Alex,Anderson
2,Amy,Ackerman
3,Allen,Ali


In [7]:
data_2.table(3)

subject_id  1 (string)  nullable,first_name  2 (string)  nullable,last_name  3 (string)  nullable
4,Billy,Bonder
5,Brian,Black
6,Bran,Balwner


In [8]:
data_3.table(3)

subject_id  1 (string)  nullable,test_id  2 (bigint)  nullable
1,51
2,15
3,15


# Join the two dataframes along rows and assign all_data

In [10]:
all_data = data_1.union(data_2)

In [11]:
all_data.table()

subject_id  1 (string)  nullable,first_name  2 (string)  nullable,last_name  3 (string)  nullable
1,Alex,Anderson
2,Amy,Ackerman
3,Allen,Ali
4,Alice,Aoni
5,Ayoung,Atiches
4,Billy,Bonder
5,Brian,Black
6,Bran,Balwner
7,Bryce,Brice
8,Betty,Btisan


# Join the two dataframes along columns and assing to all_data_col

In [12]:
from pyspark.sql.functions import monotonically_increasing_id

In [27]:
# This is not that pretty
data_1_id = data_1.withColumn("id", monotonically_increasing_id())
data_2_id = data_2.withColumn("id", monotonically_increasing_id())

all_data_col = data_1_id.join(data_2_id, "id", "outer").select(data_1_id.subject_id, data_1_id.first_name, 
                                                              data_1_id.last_name, 
                                                              data_2_id.subject_id.alias("subject_id_df2"), 
                                                              data_2_id.first_name.alias("first_name_df2"), 
                                                              data_2_id.last_name.alias("last_name_df2")).drop("id")

In [28]:
all_data_col.table()

subject_id  1 (string)  nullable,first_name  2 (string)  nullable,last_name  3 (string)  nullable,subject_id_df2  4 (string)  nullable,first_name_df2  5 (string)  nullable,last_name_df2  6 (string)  nullable
1,Alex,Anderson,4,Billy,Bonder
2,Amy,Ackerman,5,Brian,Black
4,Alice,Aoni,7,Bryce,Brice
3,Allen,Ali,6,Bran,Balwner
5,Ayoung,Atiches,8,Betty,Btisan


What happened above is that Spark needs a unique identifier for each column, so you need to rename the first or the second set of columns for it to work. I know, not great, not terrible.

# Merge all_data and data_3 along the subject_id value

In [30]:
all_data.join(data_3, on="subject_id").table()

subject_id  1 (string)  nullable,first_name  2 (string)  nullable,last_name  3 (string)  nullable,test_id  4 (bigint)  nullable
7,Bryce,Brice,14
3,Allen,Ali,15
8,Betty,Btisan,15
5,Ayoung,Atiches,16
5,Brian,Black,16
1,Alex,Anderson,51
4,Alice,Aoni,61
4,Billy,Bonder,61
2,Amy,Ackerman,15


# Merge only the data that has the same 'subject_id' on both data_1 and data_2

In [53]:
data_1.alias("a").join(data_2.alias("b"), \
                       on= col("a.subject_id") == col("b.subject_id"), \
                       how= "inner").show()                                                                                                       

+----------+----------+---------+----------+----------+---------+
|subject_id|first_name|last_name|subject_id|first_name|last_name|
+----------+----------+---------+----------+----------+---------+
|         5|    Ayoung|  Atiches|         5|     Brian|    Black|
|         4|     Alice|     Aoni|         4|     Billy|   Bonder|
+----------+----------+---------+----------+----------+---------+



Here the trick is to create an alias for each dataframe, otherwise Spark will complain.

# Merge all values in data1 and data2, with matching records from both sides where available.

In [54]:
data_1.alias("a").join(data_2.alias("b"), \
                       on= col("a.subject_id") == col("b.subject_id"), \
                       how= "outer").show() 

+----------+----------+---------+----------+----------+---------+
|subject_id|first_name|last_name|subject_id|first_name|last_name|
+----------+----------+---------+----------+----------+---------+
|      null|      null|     null|         7|     Bryce|    Brice|
|         3|     Allen|      Ali|      null|      null|     null|
|      null|      null|     null|         8|     Betty|   Btisan|
|         5|    Ayoung|  Atiches|         5|     Brian|    Black|
|      null|      null|     null|         6|      Bran|  Balwner|
|         1|      Alex| Anderson|      null|      null|     null|
|         4|     Alice|     Aoni|         4|     Billy|   Bonder|
|         2|       Amy| Ackerman|      null|      null|     null|
+----------+----------+---------+----------+----------+---------+

