In [2]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession, functions as F

In [3]:
startpath ="_Star_Ratings_and_Display_Measures/"
fallpath ="_Star_Ratings_Fall_Release/"
springpath ="_Star_Ratings_Spring_Release/"
cpath="_Part_C"
dpath="_Part_D"
finalpath ="_Report_Card_Master_Table.xlsx"
year = "2014"



In [4]:
firstfive = ["Contract Number","Organization Type","Contract Name","Organization Marketing Name","Parent Organization"]

In [5]:
def get_measure_stars(path):
    dfms = pd.read_excel(path,"Measure_Stars",header=2)
    dfms = dfms.iloc[1:]
    for x in range(5,len(dfms.keys())):
        dfms =dfms.rename(columns= {dfms.keys()[x] : dfms.keys()[x].split(': ')[1]})
    for x in range(5):
        dfms = dfms.rename(columns={f"Unnamed: {x}":firstfive[x]})
    return dfms

In [6]:
def get_domain_stars(path):
    dfds = pd.read_excel(path,"Domain_Stars",header=1)
    return dfds

In [7]:
def get_summary_rating(path):
    dfsr = pd.read_excel(path,"Summary_Rating",header=1)
    dfsr = dfsr.drop(columns="Sanction Deduction",errors='ignore')
    return dfsr

In [8]:
def get_early(y):
    fullpath = f"./Data/{y}{startpath}{y}{fallpath}{y}{finalpath}"
    dfmsf = get_measure_stars(fullpath)
    dfdsf = get_domain_stars(fullpath)
    dfsrf = get_summary_rating(fullpath)
    dff = pd.merge(dfmsf,dfdsf,on=firstfive,how='left')
    dff = pd.merge(dff,dfsrf,on=firstfive,how='left')
    dff["Year"] = f"{y} Fall"
    dff = dff.rename(columns={f"{y} Part C Summary":"Part C Summary",f"{y} Overall":"Overall",f"{y} Part D Summary":"Part D Summary"})
    fullpath = f"./Data/{y}{startpath}{y}{springpath}{y}{finalpath}"
    dfmss = get_measure_stars(fullpath)
    dfdss = get_domain_stars(fullpath)
    dfsrs = get_summary_rating(fullpath)
    dfs = pd.merge(dfmss,dfdss,on=firstfive,how='left')
    dfs = pd.merge(dfs,dfsrs,on=firstfive,how='left')
    dfs["Year"] = f"{y} Spring"
    dfs = dfs.rename(columns={f"{y} Part C Summary":"Part C Summary",f"{y} Overall":"Overall",f"{y} Part D Summary":"Part D Summary"})
    df = pd.concat([dff,dfs])
    df = df.drop(columns = "2017 Disaster %",errors='ignore')
    return df

In [9]:
def get_late(y):
    fullpath = f"./Data/{y}{startpath}{y}{finalpath}"
    dfmsc = get_measure_stars(fullpath)
    dfdsc = get_domain_stars(fullpath)
    dfsrc = get_summary_rating(fullpath)
    #drop disaster columns
    dfsrc = dfsrc.drop(columns=dfsrc.columns[[6,7]],axis = 1)
    df = pd.merge(dfmsc,dfdsc,on=firstfive,how='left')
    df = pd.merge(df,dfsrc,on=firstfive,how='left')
    df = df.rename(columns={f"{y} Part C Summary":"Part C Summary",f"{y} Overall":"Overall",f"{y} Part D Summary":"Part D Summary"})
    df["Year"] = f"{y}"
    return    df

In [10]:
def combine_spark_dfs(df1,df2):
    diff1 = [c for c in df2.columns if c not in df1.columns]
    diff2 = [c for c in df1.columns if c not in df2.columns]
    df = df1.select('*', *[F.lit(None).alias(c) for c in diff1]) \
        .unionByName(df2.select('*', *[F.lit(None).alias(c) for c in diff2]))
    return df

In [13]:
def standardize_data():
    year = 2014
    years = [x for x in range(2015,2026)]        
    fullpath = f"./Data/{year}{startpath}{year}{fallpath}{year}{cpath}{finalpath}"
    dfmsc = get_measure_stars(fullpath)
    dfdsc = get_domain_stars(fullpath)
    dfsrc = get_summary_rating(fullpath)
    df = pd.merge(dfmsc,dfdsc,on=firstfive,how='left')
    df = pd.merge(df,dfsrc,on=firstfive,how='left')
    fullpath = f"./Data/{year}{startpath}{year}{fallpath}{year}{dpath}{finalpath}"
    dfmsd = get_measure_stars(fullpath)
    dfdsd = get_domain_stars(fullpath)
    dfsrd = get_summary_rating(fullpath)
    df = pd.merge(df,dfmsd,on=firstfive,how='left')
    df = pd.merge(df,dfdsd,on=firstfive,how='left')
    df = pd.merge(df,dfsrd,on=firstfive,how='left')
    df = df.rename(columns={f"{year} Part C Summary Rating":"Part C Summary",f"{year} Overall Rating":"Overall",f"{year} Part D Summary Rating":"Part D Summary"})
    fullpath = f"./Data/{year}{startpath}{year}{springpath}{year}{cpath}{finalpath}"
    dfmsc = get_measure_stars(fullpath)
    dfdsc = get_domain_stars(fullpath)
    dfsrc = get_summary_rating(fullpath)
    df1 = pd.merge(dfmsc,dfdsc,on=firstfive,how='left')
    df1 = pd.merge(df1,dfsrc,on=firstfive,how='left')
    fullpath = f"./Data/{year}{startpath}{year}{springpath}{year}{dpath}{finalpath}"
    dfmsd = get_measure_stars(fullpath)
    dfdsd = get_domain_stars(fullpath)
    dfsrd = get_summary_rating(fullpath)
    df1 = pd.merge(df1,dfmsd,on=firstfive,how='left')
    df1 = pd.merge(df1,dfdsd,on=firstfive,how='left')
    df1 = pd.merge(df1,dfsrd,on=firstfive,how='left')    
    df1 = df1.rename(columns={f"{year} Part C Summary Rating":"Part C Summary",f"{year} Overall Rating":"Overall","{year} Part D Summary Rating":"Part D Summary"})    
    df["Year"] = "2014 Fall"
    df1["Year"] = "2014 Spring"
    finaldf = pd.concat([df,df1])
    spark = SparkSession.builder.appName("pandas to spark").getOrCreate()
    sparkdf = spark.createDataFrame(finaldf)
    sparkdf.show()
    for year in years:
        if (year <2020):
            df = get_early(year)
        else:
            df = get_late(year)
        placeholderdf = spark.createDataFrame(df)
        placeholderdf.show()
        newsparkdf = combine_spark_dfs(sparkdf,placeholderdf)
        sparkdf = newsparkdf
            #finaldf = pd.concat(finaldf,df)
    return sparkdf

In [14]:
combodf = standardize_data()
combodf.show()

Py4JJavaError: An error occurred while calling o150.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (192.168.1.27 executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4333)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3316)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3539)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:203)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:109)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:124)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:174)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:67)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.base/java.net.PlainSocketImpl.waitForNewConnection(Native Method)
	at java.base/java.net.PlainSocketImpl.socketAccept(PlainSocketImpl.java:163)
	at java.base/java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:474)
	at java.base/java.net.ServerSocket.implAccept(ServerSocket.java:565)
	at java.base/java.net.ServerSocket.accept(ServerSocket.java:533)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:190)
	... 32 more


In [61]:
spark1 = SparkSession.builder.appName("pandas to spark").getOrCreate()

testdf = get_late(2020)
testdf2 = get_early(2016)
sparktestdf =spark1.createDataFrame(testdf)
sparktestdf2 =spark1.createDataFrame(testdf2)
sparktestdf =sparktestdf.union(sparktestdf2)
sparktestdf.show()

+---------------+--------------------+--------------------+---------------------------+--------------------+----------------------------+--------------------------------+-----------------------+---------------------------------------------+-------------------------------------------+---------------------------------+-------------------------+---------------------------------------------+----------------------------------------------+---------------------------------------------------------+--------------------------------------------+--------------------------------------------------------+-----------------------------+----------------------------------------------+-------------------------------------------+------------------------------------+---------------------------------+------------------------------+---------------------------------------------+--------------------------------+------------------------------------------------------------+------------------------+---------------

In [62]:
sparktestdf.count()

                                                                                

2027

In [60]:
testdf2.describe()

Unnamed: 0,Contract Number,Organization Type,Contract Name,Organization Marketing Name,Parent Organization,C01: Breast Cancer Screening,C02: Colorectal Cancer Screening,C03: Annual Flu Vaccine,C04: Improving or Maintaining Physical Health,C05: Improving or Maintaining Mental Health,...,HD5: Health Plan Customer Service,DD1: Drug Plan Customer Service,DD2: Member Complaints and Changes in the Drug Plan’s Performance,DD3: Member Experience with the Drug Plan,DD4: Drug Safety and Accuracy of Drug Pricing,SNP,Part C Summary,Part D Summary,Overall,Year
count,1284,1284,1284,1284,1284,1284,1284,1284,1284,1284,...,1284,1284,1284,1284,1284,1284,1284,1284,1284,1284
unique,642,8,469,413,222,10,10,9,9,9,...,8,8,7,8,7,2,11,10,9,2
top,E0654,Local CCP,UNITEDHEALTHCARE INSURANCE COMPANY,UnitedHealthcare,"UnitedHealth Group, Inc.",3,Plan too new to be measured,Plan too new to be measured,Plan too new to be measured,Plan too new to be measured,...,Not enough data available,5,4,Plan too new to be measured,4,No,Plan too new to be measured,4,Plan too new to be measured,2016 Fall
freq,2,938,28,78,114,292,244,244,412,412,...,370,446,440,252,622,820,244,284,238,642
