In [0]:
import pyspark.sql.functions as f
from pyspark.sql.types import IntegerType,StringType,DateType,FloatType,DecimalType
from pyspark.sql.window import Window
from random import randint,uniform
from datetime import datetime
import decimal
myappdl_sas = dbutils.secrets.get("myApp_scope","SasToken")
spark.conf.set("fs.azure.account.auth.type.myappdl.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.myappdl.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.myappdl.dfs.core.windows.net", myappdl_sas)
spark.conf.set("Spark.jars","dbfs:/FileStore/mssql_jdbc_12_2_0_jre11.jar")
jdbcHostname=dbutils.secrets.get("myApp_scope","jdbcHostname")
jdbcDatabase=dbutils.secrets.get("myApp_scope","jdbcDatabase")
jdbcPassword=dbutils.secrets.get("myApp_scope","jdbcPassword")
jdbcUsername=dbutils.secrets.get("myApp_scope","jdbcUsername")
jdbcURL='jdbc:sqlserver://'+jdbcHostname+':1433;database={'+jdbcDatabase+'}'
connectionProperties ={
    "user":jdbcUsername,
    "password":jdbcPassword,
    "driver":"com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
@udf(returnType=IntegerType()) 
def rndm():
    return randint(1,1000)
@udf(returnType=IntegerType())
def indx():
    arr=[1,4,6,12]
    return arr[randint(0,3)]
@udf(returnType=DateType())
def NBD():
    arr=[str(i) for i in list(range(1,13,1))]
    return datetime.strptime('01/'+arr[randint(0,11)]+'/2023',"%d/%m/%Y")
@udf(returnType=StringType())
def randomNum():
    return ['Active','Ended'][randint(0,1)]
@udf(returnType=DecimalType(20,2))
def rndmDecimal():
    return decimal.Decimal(uniform(1,1000))

def createContract():
    contracts = spark.read.jdbc(url=jdbcURL,table='Contracts',properties=connectionProperties)
    company = spark.read.jdbc(url=jdbcURL,table='Company',properties=connectionProperties)
    contracts.createOrReplaceTempView('Conts')
    company.createOrReplaceTempView('company')
    maxid=contracts.agg({"id": "max"}).collect()[0]["max(id)"]
    if maxid==None:
        maxid=0
    contractCntBYyNSO = spark.sql("Select distinct c.companyid companyid,count(*) mx from company n left join conts c group by c.companyid")
    if(len(contracts.head(1))==0):
        contractCntBYyNSO = spark.sql("Select distinct id companyid,0 mx from company")
    
    contracts1=users.alias('u').join(f.broadcast(contractCntBYyNSO).alias('cn'),f.col('u.companyid')==f.col('cn.companyid'),'inner')\
        .join(f.broadcast(company).alias('c'),f.col('u.companyid')==f.col('c.id'),'inner')\
        .join(user_data.alias('ud'),f.col('u.id')==f.col('ud.userid'),'inner')\
        .select((maxid+f.row_number().over(Window.orderBy(f.col('u.id').asc()))).alias('id'),\
        f.concat(f.substring(f.col('c.country'),0,4),f.lit('-'),(f.row_number().over(Window.partitionBy(f.col('u.companyid')).orderBy(f.col('u.id').asc()))+f.col('cn.mx')).cast(StringType())).alias('contractNumber'),\
        f.col('u.id').alias('userid'),\
        f.col('u.startdate').cast(DateType()).alias('startdate'),\
        f.col('ud.id').alias('invoiceaddressid'),\
        f.col('u.companyid').alias('companyid'))
    contracts1=contracts1.withColumn("productid",rndm()).withColumn("billingFreq",indx()).withColumn("nextBillingDate",NBD())\
    .withColumn('BillingStatus',f.lit('Active')).withColumn('ContractStatus',randomNum()).withColumn('serviceCharge',rndmDecimal())
    if(len(contracts1.head(1))!=0):
        display(contracts1)
        dirname = datetime.today().strftime('%Y-%m-%d')
        contracts1.write.csv('abfss://raw@myappdl.dfs.core.windows.net/Contracts/Contract_'+dirname+'/',mode='overwrite',header=True)
        contracts1.write.jdbc(url=jdbcURL,table='Contracts',mode='append',properties=connectionProperties)
        return True
    return False
    


rawData=spark.read.csv('abfss://raw@myappdl.dfs.core.windows.net/API_data/UserData*.csv',header=True)
nso = spark.read.jdbc(url=jdbcURL,table='Company',properties=connectionProperties)
nso.createOrReplaceTempView('NSO')
rawData.createOrReplaceTempView('temp_table')

users = spark.read.jdbc(url=jdbcURL,table='Users',properties=connectionProperties)
users.createOrReplaceTempView('users')
maxid = users.agg({"id": "max"}).collect()[0]["max(id)"]
if maxid==None:
    maxid= 0
users = spark.sql(f"select distinct {maxid}+row_number() over (order by t.registration_date asc) id,t.title||' '||t.firstname||' '||t.lastname name,\
    t.registration_date StartDate, t.uuid uniqueidentifier, t.dob_date dob,t.gender gender,t.username username, t.password password, n.id companyid \
        from temp_table t inner join NSO n on n.country=upper(t.country) \
            where not exists (select 1 from users u where u.uniqueidentifier=t.uuid)")
users.createOrReplaceTempView('users')
display(users)
user_data= spark.sql(f"select distinct {maxid}+row_number() over (partition by n.id order by t.uuid asc) id, u.id userid, \
    t.title||' '||t.firstname||' '||t.lastname||', '||t.streetName||'-'||t.streetnumber||', '||t.city||', '||t.state||', '||t.country||'- '||t.postcode invoiceaddress, \
        n.id companyid, t.latitude lat, t.longitude lng, t.email emailid, t.phone phone, t.cell mobno\
            from temp_table t inner join NSO n on n.country=upper(t.country)\
                inner join users u on u.uniqueidentifier=t.uuid")
user_data.createOrReplaceTempView("userdata")
display(user_data)

if(len(users.head(1))!=0 & createContract()):
    dirname = datetime.today().strftime('%Y-%m-%d')
    users.write.csv('abfss://raw@myappdl.dfs.core.windows.net/User/Users_'+dirname+'/',mode='overwrite',header=True)
    user_data.write.csv('abfss://raw@myappdl.dfs.core.windows.net/User/UserData_'+dirname+'/',mode='overwrite',header=True)
    users.write.jdbc(url=jdbcURL,table='Users',mode='append',properties=connectionProperties)
    user_data.write.jdbc(url=jdbcURL,table='UserData',mode='append',properties=connectionProperties)
