In [8]:
import os
import sys

os.environ["SPARK_HOME"] = "/opt/spark"
spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, os.path.join(spark_home, 'python'))
sys.path.insert(0, os.path.join(spark_home, 'python/lib'))

In [15]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").getOrCreate()
assert spark.range(5).rdd.flatMap(lambda x: x).sum() == 10

In [20]:
person = spark.createDataFrame([
    (0, "Ben Benson", 0, [100]),
    (1, "Tom Tomhson", 1, [500, 250, 100]),
    (2, "Jill Jilli", 1, [250, 100])
]).toDF("id", "name", "study_program", "spark_status")

In [21]:
person.show()

+---+-----------+-------------+---------------+
| id|       name|study_program|   spark_status|
+---+-----------+-------------+---------------+
|  0| Ben Benson|            0|          [100]|
|  1|Tom Tomhson|            1|[500, 250, 100]|
|  2| Jill Jilli|            1|     [250, 100]|
+---+-----------+-------------+---------------+



In [24]:
studyProgram = spark.createDataFrame([
    (0, "Masters", "Apache Apex", "Great university"),
    (1, "Masters", "Apache Spark", "Great university"),
    (2, "Ph.D.", "Apache Spark", "Great university")
]).toDF("id", "degree", "department", "school")
studyProgram.show()

+---+-------+------------+----------------+
| id| degree|  department|          school|
+---+-------+------------+----------------+
|  0|Masters| Apache Apex|Great university|
|  1|Masters|Apache Spark|Great university|
|  2|  Ph.D.|Apache Spark|Great university|
+---+-------+------------+----------------+



In [25]:
sparkStatus = spark.createDataFrame([
    (500, "Vice President"),
    (250, "PMC Member"),
    (100, "Contributor")
]).toDF("id", "status")
sparkStatus.show()

+---+--------------+
| id|        status|
+---+--------------+
|500|Vice President|
|250|    PMC Member|
|100|   Contributor|
+---+--------------+



In [43]:
from pyspark.sql.functions import array_contains
person.where(array_contains("spark_status", "250")).show()

+---+-----------+-------------+---------------+
| id|       name|study_program|   spark_status|
+---+-----------+-------------+---------------+
|  1|Tom Tomhson|            1|[500, 250, 100]|
|  2| Jill Jilli|            1|     [250, 100]|
+---+-----------+-------------+---------------+



In [45]:
from pyspark.sql.functions import array_contains, col, lit
person.where(array_contains(col("spark_status"), lit("250"))).show()

TypeError: 'Column' object is not callable

In [33]:
from tabulate import tabulate
join_expr = person["study_program"] == studyProgram["id"]
for join_type in ["inner", "cross", "outer",
                  "full", "full_outer", "left", 
                  "left_outer", "right", "right_outer",
                  "left_semi", "left_anti"]:
    joined_df = studyProgram.join(person, join_expr, join_type)
    print("="*10 + join_type)
    print(tabulate(joined_df.collect(), joined_df.columns))

  id  degree    department    school              id  name           study_program  spark_status
----  --------  ------------  ----------------  ----  -----------  ---------------  ---------------
   0  Masters   Apache Apex   Great university     0  Ben Benson                 0  [100]
   1  Masters   Apache Spark  Great university     1  Tom Tomhson                1  [500, 250, 100]
   1  Masters   Apache Spark  Great university     2  Jill Jilli                 1  [250, 100]
  id  degree    department    school              id  name           study_program  spark_status
----  --------  ------------  ----------------  ----  -----------  ---------------  ---------------
   0  Masters   Apache Apex   Great university     0  Ben Benson                 0  [100]
   1  Masters   Apache Spark  Great university     1  Tom Tomhson                1  [500, 250, 100]
   1  Masters   Apache Spark  Great university     2  Jill Jilli                 1  [250, 100]
  id  degree    department    school

In [35]:
df1 = spark.createDataFrame([
    (1, "aaa"),
    (2, "bbb"),
    (3, "bbb"),    
]).toDF("id", "name1")

df2 = spark.createDataFrame([
    (0, "cow"),
    (1, "goose"),
    (1, "duck"),    
]).toDF("id", "name2")

In [38]:
from pyspark.sql.functions import col
df1.join(df2, col("id") == col("id")).show()

AnalysisException: u"Reference 'id' is ambiguous, could be: id#597L, id#607L.;"

In [41]:
df1.crossJoin(df2).show()

+---+-----+---+-----+
| id|name1| id|name2|
+---+-----+---+-----+
|  1|  aaa|  0|  cow|
|  1|  aaa|  1|goose|
|  1|  aaa|  1| duck|
|  2|  bbb|  0|  cow|
|  2|  bbb|  1|goose|
|  2|  bbb|  1| duck|
|  3|  bbb|  0|  cow|
|  3|  bbb|  1|goose|
|  3|  bbb|  1| duck|
+---+-----+---+-----+



In [39]:
from pyspark.sql.functions import col
df1.join(df2, df1["id"] == df2["id"]).show()

+---+-----+---+-----+
| id|name1| id|name2|
+---+-----+---+-----+
|  1|  aaa|  1|goose|
|  1|  aaa|  1| duck|
+---+-----+---+-----+



In [40]:
df1.join(df2, "id").show()

+---+-----+-----+
| id|name1|name2|
+---+-----+-----+
|  1|  aaa|goose|
|  1|  aaa| duck|
+---+-----+-----+



In [50]:
from pyspark.sql.functions import monotonically_increasing_id
df1_1 = df1.withColumn("addition_first", monotonically_increasing_id())
df1_2 = df1.withColumn("addition_two", monotonically_increasing_id())

df1_1.join(df1_2, ["id", "name1"]).show()

+---+-----+--------------+------------+
| id|name1|addition_first|addition_two|
+---+-----+--------------+------------+
|  2|  bbb|             1|           1|
|  1|  aaa|             0|           0|
|  3|  bbb|             2|           2|
+---+-----+--------------+------------+



In [51]:
join_expr = person["study_program"] == studyProgram["id"]
joined_df = studyProgram.join(person, join_expr, "inner")
joined_df.explain()

== Physical Plan ==
*SortMergeJoin [id#114L], [study_program#68L], Inner
:- *Sort [id#114L ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id#114L, 200)
:     +- *Project [_1#105L AS id#114L, _2#106 AS degree#115, _3#107 AS department#116, _4#108 AS school#117]
:        +- *Filter isnotnull(_1#105L)
:           +- Scan ExistingRDD[_1#105L,_2#106,_3#107,_4#108]
+- *Sort [study_program#68L ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(study_program#68L, 200)
      +- *Project [_1#57L AS id#66L, _2#58 AS name#67, _3#59L AS study_program#68L, _4#60 AS spark_status#69]
         +- *Filter isnotnull(_3#59L)
            +- Scan ExistingRDD[_1#57L,_2#58,_3#59L,_4#60]


In [52]:
from pyspark.sql.functions import broadcast
join_expr = person["study_program"] == studyProgram["id"]
joined_df = studyProgram.join(broadcast(person), join_expr, "inner")
joined_df.explain()

== Physical Plan ==
*BroadcastHashJoin [id#114L], [study_program#68L], Inner, BuildRight
:- *Project [_1#105L AS id#114L, _2#106 AS degree#115, _3#107 AS department#116, _4#108 AS school#117]
:  +- *Filter isnotnull(_1#105L)
:     +- Scan ExistingRDD[_1#105L,_2#106,_3#107,_4#108]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[2, bigint, true]))
   +- *Project [_1#57L AS id#66L, _2#58 AS name#67, _3#59L AS study_program#68L, _4#60 AS spark_status#69]
      +- *Filter isnotnull(_3#59L)
         +- Scan ExistingRDD[_1#57L,_2#58,_3#59L,_4#60]


In [72]:
from tabulate import tabulate
df = df1.join(df2, "id")
first = ["{:<34}".format(line) for line 
                   in  tabulate(df.collect(), df.columns).split("\n")]

second = ["{:<20}".format(line) for line 
                   in  tabulate(df.collect(), df.columns).split("\n")]

print("\n".join([t[0] + t[1] for t in zip(first, second)]))

  id  name1    name2                id  name1    name2
----  -------  -------            ----  -------  -------
   1  aaa      goose                 1  aaa      goose
   1  aaa      duck                  1  aaa      duck 


##### help(pad)