In [1]:
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType,DateType,LongType,IntegerType,TimestampType

#JDBC connect details for SQL Server database
jdbcHostname = "jdbcHostname"
jdbcDatabase = "Movies"
jdbcUsername = "jdbcUsername"
jdbcPassword = "jdbcPassword"
jdbcPort = "1433"

connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}

jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, jdbcUsername, jdbcPassword)
writeConfig = {
    "Endpoint": "Endpoint",
    "Masterkey": "Masterkey",
    "Database": "Movies",
    "Collection": "Orders",
    "Upsert": "true"
}

In [2]:
import json
import ast
import pyspark.sql.functions as F
import uuid
import numpy as np
from functools import reduce
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import exp
from pyspark.sql.functions import col
from pyspark.sql.functions import lit
from pyspark.sql.functions import array
from pyspark.sql.types import *
from multiprocessing.pool import ThreadPool

#get all orders
orders = sqlContext.read.jdbc(url=jdbcUrl, table="orders", properties=connectionProperties)

#get all order details
orderdetails = sqlContext.read.jdbc(url=jdbcUrl, table="orderdetails", properties=connectionProperties)

#get all OrderId values to pass to map function 
cols = orders.select('OrderId').collect()

#create thread pool big enough to process merge of details to orders in parallel
pool = ThreadPool(10)

def writeOrder(col):
  #filter the order on current value passed from map function
  order = orders.filter(orders['OrderId'] == col[0])
  
  #set id to be a uuid
  order = order.withColumn("id", lit(str(uuid.uuid1())))
  
  #add details field to order dataframe
  order = order.withColumn("details", lit(''))
  
  #filter order details dataframe to get details we want to merge into the order document
  orderdetailsgroup = orderdetails.filter(orderdetails['OrderId'] == col[0])
  
  #convert dataframe to pandas because of trouble putting array type into spark dataframe field using pyspark
  orderpandas = order.toPandas()
  
  #convert details dataframe to json, but only if details were returned
  if (orderdetailsgroup.count() !=0):
    jsonstring = orderdetailsgroup.toJSON().collect()
    
    #set details field to be the details json array
    orderpandas['details'][0] = jsonstring    
  
  #convert the order dataframe to json and do some string manipulation to get valid json
  orderjson = orderpandas.to_json(orient='records')
  orderjson = reduce(lambda s,r: s.replace(*r),[("\\", ""),("[\"", "["),("\"]", "]"),("}\",\"{", "},{"),("\"", "'")], orderjson)
  orderjson = orderjson[1:-1] 
  
  #read the json into spark dataframe
  df = spark.read.json(sc.parallelize([orderjson]))
  
  #write the dataframe (this will be a single order record with merged many-to-one order details) to cosmos db using spark connector
  df.write.format("com.microsoft.azure.cosmosdb.spark").mode("append").options(**writeConfig).save()

#map order details to orders in parallel using the above function
pool.map(writeOrder, cols)

  
