# Imports

In [1]:
from pyspark import SparkContext as sparkc
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import DataFrameReader

# Code

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [3]:
schema = StructType([
    StructField("START", StringType(), True),
    StructField("STOP", StringType(), True),
    StructField("PATIENT", StringType(), True),
    StructField("ENCOUNTER", StringType(), True),
    StructField("CODE", IntegerType(), True),
    StructField("DESCRIPTION", StringType(), True)])

df = spark.read.csv("./Assignment1/conditions.csv",header=True,schema=schema)
# df = spark.read.csv("Assignment1/conditionsample.csv",header=True,schema=schema).select('PATIENT','CODE')

df.printSchema()
df.show()

root
 |-- START: string (nullable = true)
 |-- STOP: string (nullable = true)
 |-- PATIENT: string (nullable = true)
 |-- ENCOUNTER: string (nullable = true)
 |-- CODE: integer (nullable = true)
 |-- DESCRIPTION: string (nullable = true)

+----------+----------+--------------------+--------------------+---------+--------------------+
|     START|      STOP|             PATIENT|           ENCOUNTER|     CODE|         DESCRIPTION|
+----------+----------+--------------------+--------------------+---------+--------------------+
|2017-01-14|2017-03-30|09e4e8cb-29c2-4ef...|88e540ab-a7d7-47d...| 65363002|        Otitis media|
|2012-09-15|2012-09-16|b0a03e8c-8d0f-424...|e89414dc-d0c6-478...|241929008|Acute allergic re...|
|2018-06-17|2018-06-24|09e4e8cb-29c2-4ef...|c14325b0-f7ec-431...|444814009|Viral sinusitis (...|
|2019-04-19|2019-09-26|09e4e8cb-29c2-4ef...|71af18ee-3157-408...| 65363002|        Otitis media|
|2019-04-27|2019-05-18|09e4e8cb-29c2-4ef...|411d4eae-72d1-478...|444814009|Viral s

## PATIENT - CODE

In [4]:
patients = df.select('PATIENT', 'CODE')
patients.show()

+--------------------+---------+
|             PATIENT|     CODE|
+--------------------+---------+
|09e4e8cb-29c2-4ef...| 65363002|
|b0a03e8c-8d0f-424...|241929008|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 65363002|
|09e4e8cb-29c2-4ef...|444814009|
|09e4e8cb-29c2-4ef...| 33737001|
|b0a03e8c-8d0f-424...|444814009|
|b0a03e8c-8d0f-424...| 10509002|
|b0a03e8c-8d0f-424...|233678006|
|b0a03e8c-8d0f-424...|195662009|
|b0a03e8c-8d0f-424...|232353008|
|b0a03e8c-8d0f-424...|195662009|
|5420ae87-24c8-4ed...|446096008|
|5420ae87-24c8-4ed...|284551006|
|5420ae87-24c8-4ed...|283371005|
|5420ae87-24c8-4ed...| 72892002|
|5420ae87-24c8-4ed...|444814009|
|5420ae87-24c8-4ed...|195662009|
|bf1f30f2-27de-4b5...|162864005|
|bf1f30f2-27de-4b5...|283385000|
+--------------------+---------+
only showing top 20 rows



## CODE - DESCRIPTION (DISTINCT)

In [5]:
diagnosis = df.select('CODE', 'DESCRIPTION').distinct()
diagnosis.show()

+---------+--------------------+
|     CODE|         DESCRIPTION|
+---------+--------------------+
|446096008|Perennial allergi...|
| 88805009|Chronic congestiv...|
| 65363002|        Otitis media|
| 79586000|     Tubal pregnancy|
| 95417003|Primary fibromyal...|
| 74400008|        Appendicitis|
|262574004|        Bullet wound|
|236077008| Protracted diarrhea|
| 44054006|            Diabetes|
|443165006|Pathological frac...|
|444814009|Viral sinusitis (...|
|399211009|History of myocar...|
|241929008|Acute allergic re...|
|110030002|Concussion injury...|
|200936003| Lupus erythematosus|
|444470001|Injury of anterio...|
|307731004|Injury of tendon ...|
|196416002|     Impacted molars|
| 43878008|Streptococcal sor...|
|424132000|Non-small cell ca...|
+---------+--------------------+
only showing top 20 rows



## Functions

In [144]:
def tuples(x):
    return set(x)

### Spark Functions

In [6]:
codeGroupedByPatients = df.rdd.map(lambda x: (x.PATIENT, x.CODE)).groupByKey().mapValues(set)
codeGroupedByPatients.take(10)
# codeGroupedByPatients.collect()

[('28a3cdb7-1db1-4148-8280-8a4e5b4f99e0',
  {19169002, 72892002, 156073000, 284551006}),
 ('3826037f-19e0-4c7b-98e5-4e9578472f67',
  {24079001, 55822004, 65966004, 162864005}),
 ('e32e0069-2d3f-4b7b-b420-3269c94723ad', {16114001, 162864005, 195662009}),
 (None, {None}),
 ('887ad9bb-bd72-44cf-8e5e-8aff7fbdeed4',
  {40275004, 44465007, 72892002, 195662009, 444814009}),
 ('8e763f75-614b-4ef7-aa86-ce459dd3142e',
  {10509002, 70704007, 128613002, 195662009, 703151001}),
 ('8b0755cd-54d4-48e2-a163-4bf04e47f2f2',
  {15777000,
   36971009,
   38822007,
   40055000,
   59621000,
   195662009,
   271737000,
   444814009,
   446096008}),
 ('2593819d-f0ff-470b-95da-656e8340255c',
  {10509002,
   19169002,
   35999006,
   72892002,
   195662009,
   198992004,
   232353008,
   398254007,
   444814009}),
 ('de087296-4f63-40b4-94f0-fc0dd91df200',
  {10509002, 162864005, 195662009, 408512008, 444814009}),
 ('4a181f3d-0937-466a-a503-d449aea0dbfa', {70704007, 75498004})]

# Frequent Items Table

In [15]:
def frequent_items_table(itemCounts):
#     for item in itemCounts:
#     itemCounts.sortBy(values);
#     sortedItemCounts = sorted(itemCounts.items(), key=lambda kv: kv[1], reverse=True)
#     print(sortedItemCounts)
#     return sortedItemCounts
#     print(sorted(itemCounts.items(), reverse=True))
    
    print(itemCounts.items())
    frequentItems = list()
    supportThreshold = 1000
#     for item in itemCounts:
    for key, value in itemCounts.items():
#         print("%d - %d" % (key, value))
#         break
#         if value >= supportThreshold and key is not "None":
        if key is None:
#         if item is None:
            continue
        if value >= supportThreshold:
            frequentItems.append(key)
#             frequentItems.add(item)
    return frequentItems

#     supportThreshold = 1000
#     return list(map(lambda i: i > supportThreshold, itemCounts))

# A-Priori

In [8]:
from collections import defaultdict

def apriori(codePatients):
    item_counts, pair_counts = defaultdict(int),  defaultdict(int)
#     item_counts, pair_counts = {}, {}
    for basket in codePatients.collect():
#         print(basket)
        for item in basket[1]:
#             print(item)
#             if item not in item_counts:
#                 item_counts[item] = 1
#             else:
                item_counts[item] += 1

#     print(item_counts)
    frequent_items = frequent_items_table(item_counts)

    # for (item in basket):
    for i in codePatients.collect():
        if i not in frequent_items:
            continue
        for j in codePatients.collect():
            if j in frequent_items:
                pair_counts[i, j] += 1
#     return pair_counts
#     print(pair_counts)

In [18]:
result = apriori(codeGroupedByPatients)
# print(result)

ERROR:py4j.java_gateway:An error occurred while trying to connect to the Java server (127.0.0.1:44313)
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 929, in _get_connection
    connection = self.deque.pop()
IndexError: pop from an empty deque

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1067, in start
    self.socket.connect((self.address, self.port))
ConnectionRefusedError: [Errno 111] Connection refused


Py4JNetworkError: An error occurred while trying to connect to the Java server (127.0.0.1:44313)

In [None]:
# % first pass
f o r ( eac h b a s k e t )
f o r ( eac h i t em i i n b a s k e t )
i tem_co u n t s [ i ] += 1
# % create frequent item stable
f r e q u e n t _ i t em s = f r e q u e n t _ i t em s _ t a b l e ( i tem_co u n t s )
# % second pass
f o r ( eac h i t em i i n b a s k e t )
i f i no t i n f r e q u e n t _ i t e m s : c o n t i n u e
f o r ( eac h i t em j i n b a s k e t ) %w i t h j > i
i f j i n f r e q u e n t _ i t e m s
p a i r_ c o u n t s [ i , j ] += 1

### TESTES

In [127]:
test = df.rdd.map(lambda x: (x.PATIENT, x.CODE)).take(2)
print(test)

# for t in test:
#     print(t)

# for row in test:
#     print("{} has {}".format(
#         row["PATIENT"],
#         row["CODE"]))

[('09e4e8cb-29c2-4ef4-86c0-a6ff0ba25d2a', 65363002), ('b0a03e8c-8d0f-4242-9548-40f4d294eba8', 241929008)]


In [126]:
test2 = df.rdd.map(lambda x: (x.PATIENT, x.CODE)).groupByKey().mapValues(set)
# test2.fullOuterJoin(test2).take(5)
# print(test2)
# test2.mapValues(set)
# test2.mapValues(lambda code: code).mapValues(set).take(5)
# test2.mapValues(lambda code: code.lower()).take(4)
test2.take(5)

# pair = test2.groupByKey().take(1)
# print("%s:%s" % (test2[0], ",".join([n for n in test2[1]])))

[('5420ae87-24c8-4ed4-ad14-041d15aadae9',
  {72892002, 195662009, 283371005, 284551006, 444814009, 446096008}),
 ('90f0b8d0-3888-415f-8234-d68f7beab894',
  {10509002, 44465007, 72892002, 195662009, 307731004, 444814009}),
 ('3826037f-19e0-4c7b-98e5-4e9578472f67',
  {24079001, 55822004, 65966004, 162864005}),
 ('b295d437-e4a3-4699-9e96-e4a29d5f7aa1',
  {40055000, 59621000, 64859006, 70704007, 162864005, 444814009}),
 ('4a772dc0-bd42-4b84-b55e-053389020c57',
  {10509002,
   15777000,
   19169002,
   40055000,
   55822004,
   72892002,
   75498004,
   82423001,
   156073000,
   444814009})]

In [None]:
'''sc = sparkc(appName="Assignemnt1-2_conditions")
textfile = sc.textFile('./Assignment1/conditionsample.csv')'''

In [None]:
'''spark.read.format('csv').options(header='true', inferSchema='true')
    .load('zipcodes.csv')'''

In [None]:
val schema = new StructType()
    .add("START",IntegerType,true)
    .add("STOP",IntegerType,true)
    .add("PATIENT",StringType,true)
    .add("ENCOUNTER",StringType,true)
    .add("CODE",StringType,true)
    .add("DESCRIPTION",StringType,true)

val df_with_schema = spark.read.format("csv")
    .option("header", "true")
    .schema(schema)
    .load("src/main/resources/zipcodes.csv")
df_with_schema.printSchema()
df_with_schema.show(false)

In [None]:
df = textfile.split(',')
type(df)

In [None]:
textfile.count()