In [1]:
import pyspark
import pyspark.sql

import pandas

from data_algebra.data_ops import *
import data_algebra.PostgreSQL

In [2]:
d_local = pandas.DataFrame({
    'subjectID':[1, 1, 2, 2],
    'surveyCategory': [ "withdrawal behavior", "positive re-framing", "withdrawal behavior", "positive re-framing"],
    'assessmentTotal': [5, 2, 3, 4],
    'irrelevantCol1': ['irrel1']*4,
    'irrelevantCol2': ['irrel2']*4,
})
d_local

Unnamed: 0,subjectID,surveyCategory,assessmentTotal,irrelevantCol1,irrelevantCol2
0,1,withdrawal behavior,5,irrel1,irrel2
1,1,positive re-framing,2,irrel1,irrel2
2,2,withdrawal behavior,3,irrel1,irrel2
3,2,positive re-framing,4,irrel1,irrel2


In [3]:
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession.builder.appName('pandasToSparkDF').getOrCreate()

d_spark = spark.createDataFrame(d_local)
d_spark

DataFrame[subjectID: bigint, surveyCategory: string, assessmentTotal: bigint, irrelevantCol1: string, irrelevantCol2: string]

In [4]:
d_spark.createOrReplaceTempView("d")
sql_df = spark.sql("SELECT * FROM d")
sql_df.show()

+---------+-------------------+---------------+--------------+--------------+
|subjectID|     surveyCategory|assessmentTotal|irrelevantCol1|irrelevantCol2|
+---------+-------------------+---------------+--------------+--------------+
|        1|withdrawal behavior|              5|        irrel1|        irrel2|
|        1|positive re-framing|              2|        irrel1|        irrel2|
|        2|withdrawal behavior|              3|        irrel1|        irrel2|
|        2|positive re-framing|              4|        irrel1|        irrel2|
+---------+-------------------+---------------+--------------+--------------+



In [5]:
local_copy = pandas.DataFrame(sql_df.collect())
local_copy

Unnamed: 0,0,1,2,3,4
0,1,withdrawal behavior,5,irrel1,irrel2
1,1,positive re-framing,2,irrel1,irrel2
2,2,withdrawal behavior,3,irrel1,irrel2
3,2,positive re-framing,4,irrel1,irrel2


In [6]:
scale = 0.237

with data_algebra.env.Env(locals()) as env:
    ops = data_algebra.data_ops.describe_table(d_local, 'd'). \
        extend({'probability': '(assessmentTotal * scale).exp()'}). \
        extend({'total': 'probability.sum()'},
               partition_by='subjectID'). \
        extend({'probability': 'probability/total'}). \
        extend({'sort_key': '-probability'}). \
        extend({'row_number': '_row_number()'},
               partition_by=['subjectID'],
               order_by=['sort_key']). \
        select_rows('row_number == 1'). \
        select_columns(['subjectID', 'surveyCategory', 'probability']). \
        rename_columns({'diagnosis': 'surveyCategory'})
    
print(ops.to_python(pretty=True))

TableDescription(
    table_name="d",
    column_names=[
        "subjectID",
        "surveyCategory",
        "assessmentTotal",
        "irrelevantCol1",
        "irrelevantCol2",
    ],
).extend({"probability": "(assessmentTotal * 0.237).exp()"}).extend(
    {"total": "probability.sum()"}, partition_by=["subjectID"]
).extend(
    {"probability": "probability / total"}
).extend(
    {"sort_key": "-probability"}
).extend(
    {"row_number": "_row_number()"}, partition_by=["subjectID"], order_by=["sort_key"]
).select_rows(
    "row_number == 1"
).select_columns(
    ["subjectID", "surveyCategory", "probability"]
).rename_columns(
    {"diagnosis": "surveyCategory"}
)



In [7]:
# Use PostgreSQL SQL model as a stand-in for Spark SQL model 
db_model = data_algebra.PostgreSQL.PostgreSQLModel()
sql = ops.to_sql(db_model, pretty=True)
# need to finish Spark emitter
sql = sql.replace('"', '')
print(sql)

SELECT probability,
       subjectid,
       surveycategory AS diagnosis
FROM
  (SELECT surveycategory,
          probability,
          subjectid
   FROM
     (SELECT surveycategory,
             probability,
             subjectid
      FROM
        (SELECT surveycategory,
                probability,
                sort_key,
                subjectid,
                ROW_NUMBER() OVER (PARTITION BY subjectid
                                   ORDER BY sort_key) AS row_number
         FROM
           (SELECT surveycategory,
                   probability,
                   subjectid,
                   (-probability) AS sort_key
            FROM
              (SELECT surveycategory,
                      subjectid,
                      probability / total AS probability
               FROM
                 (SELECT surveycategory,
                         probability,
                         subjectid,
                         SUM(probability) OVER (PARTITION BY subjectid) AS tota

In [8]:
sql_q = spark.sql(sql)


In [9]:
res = pandas.DataFrame(sql_q.collect())
res

Unnamed: 0,0,1,2
0,0.670622,1,withdrawal behavior
1,0.558974,2,positive re-framing


In [10]:
sc.stop()