In [None]:
import re, time
import pandas as pd
import pyspark as ps
from pyspark.sql import DataFrameWriter
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, ArrayType, DoubleType
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, MinHashLSH, BucketedRandomProjectionLSH
from pyspark.ml import Pipeline
from pyspark.mllib.linalg.distributed import RowMatrix

In [3]:
spark = ps.sql.SparkSession.builder \
            .appName("capstone") \
            .getOrCreate()
#             .master("local[64]") \
#             .config('spark.driver.extraClassPath','postgresql-9.1-901-1.jdbc4.jar') \
            

sc = spark.sparkContext  # for the pre-2.0 sparkContext

In [4]:
#Checking if Spark Context is running --> RDDS and SQL Context is running --> Dataframes
sc, sqlCtx

(<SparkContext master=local[*] appName=PySparkShell>,
 <pyspark.sql.context.SQLContext at 0x7fb02a423978>)

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader.csv

In [None]:
# schema = StructType( [
#     StructField('NPI', IntegerType(), True), 
#     StructField('Entity Type Code', IntegerType(), True),
#     StructField('Replacement NPI', IntegerType(), True)
#     ] )

In [None]:
link = 's3n://gschoolcapstone/npidata_20050523-20170813.csv'
# .option("maxColumns", 309)
df = spark.read.csv(link, header=True, inferSchema=True)
# .limit(100)

In [None]:
# df.write.json('subset')
# df.write.format('json').save('../data/subset.json')
# df = spark.read.json('../data/subset.json')

In [None]:
# Rename columns in proper format
cols = df.columns
new_cols = [col.replace('(', '').replace(')', '').replace('.', '').replace(' ', '_') for col in cols]
for old, new in zip(cols, new_cols):
    df = df.withColumnRenamed(old, new)

http://spark.apache.org/docs/2.2.0/api/python/pyspark.sql.html#pyspark-sql-module

In [None]:
df.createOrReplaceTempView("npi")
# df.registerTempTable('npi')
# spark.sql('SELECT * FROM npi').show()

In [None]:
# Filter where NPIs are active... do updates have reactivated providers?
df = df.filter(df.Entity_Type_Code.isNotNull())
# spark.sql('SELECT Entity_Type_Code FROM npi GROUP BY Entity_Type_Code').show()

In [None]:
# Provider_Gender_Code: M, F, null, GUTHMILLER
# df.select('Provider_Gender_Code').filter("Provider_Gender_Code == 'GUTHMILLER'").show()
df = df.withColumn('Gender', regexp_replace(df.Provider_Gender_Code, 'GUTHMILLER', 'X'))
# df = df.regexp_replace('GUTHMILLER', 'X', subset='Provider_Gender_Code')

In [None]:
# df.createOrReplaceTempView("npi")
# spark.sql('SELECT Gender FROM npi GROUP BY Gender').show()
# spark.sql('SELECT Provider_Gender_Code FROM npi GROUP BY Provider_Gender_Code').show()

In [None]:
# Fill na values
na_dict = {'Gender': 'X', 
           'Is_Sole_Proprietor': 'X', 
           'Is_Organization_Subpart': 'X',
           'Provider_Credential_Text': 'X'}
#            'Healthcare_Provider_Taxonomy_Code_1': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_2': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_3': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_4': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_5': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_6': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_7': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_8': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_9': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_10': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_11': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_12': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_13': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_14': 'NA',
#            'Healthcare_Provider_Taxonomy_Code_15': 'NA', 
df = df.na.fill(na_dict)

In [None]:
def formatting(x):
    x = x.upper()
    x = re.sub(re.compile('\.|\>|\`'), '', x)
    x = re.sub(re.compile('\,|\;|\-|\(|\)|\/'), ' ', x)
    x = re.sub(re.compile('\s+'), ' ', x)
#     x = x.replace('M D', 'MD')
    x = re.sub("M D", "MD", x)
    x = re.sub("D C", "DC", x)
    x = re.sub("P C", "PC", x)
    x = re.sub("D P M", "DPM", x)
    x = re.sub("D O", "DO", x)
    x = re.sub("O D", "OD", x)
    x = re.sub("0D", "OD", x)
    x = re.sub("PHARMD", "RPH ", x)
    x = re.sub("PHYSICIAN ASSISTANT", "PA", x)
    x = re.sub("NURSE PRACTITIONER", "NP", x)
    x = re.sub("PHYSICAL THERAPIST", "PT", x)
    x = re.sub("(BS IN PHARMACY|BS PHARMACY|DOCTOR OF PHARMACY|PHARMACIST|PHARMD)", " RPH ", x)
    x = re.sub("[\d]", "", x) # remove numbers
    x = x.strip()
    return x

format_udf = udf(formatting, StringType())
# print(formatting('hey. this is john. . . .'))

In [None]:
# df = df.withColumn('test', format_udf(col('Provider_Credential_Text')))
# df.show(5)
# test = df.withColumn('new', regexp_replace(df.Provider_Credential_Text, '\.', ''))
# test.select('Provider_Credential_Text','new').show(10)

df = df.withColumn('Credentials', format_udf(df.Provider_Credential_Text))
# df.select('Provider_Credential_Text', 'Credentials').show(5)

In [None]:
df.select('Gender','Is_Sole_Proprietor','Is_Organization_Subpart','Credentials').show(5)

### Spark pipeline to get feature vector

In [None]:
stages, feature_cols = [], []

In [None]:
for col in my_cols:
    stages.append(StringIndexer(inputCol=col, outputCol=col+'_idx', handleInvalid='error'))
    stages.append(OneHotEncoder(dropLast=True, inputCol=col+'_idx', outputCol=col+'_ohe'))
    feature_cols.append(col+'_ohe')
stages.append(VectorAssembler(inputCols=feature_cols, outputCol='features'))

In [None]:
# df = df.drop('Provider_Gender_Code_idx')
# df = df.drop('Provider_Gender_Code_ohe')

In [None]:
col = 'Provider_Gender_Code'
stridx = StringIndexer(inputCol=col, outputCol=col+'_idx', handleInvalid='error')

In [None]:
# model = stridx.fit(df)
# df = model.transform(df)
# {i: label for i, label in enumerate(model.labels)}

In [None]:
# df.select('Gender').show()

In [None]:
# df.columns

In [None]:
ohe = OneHotEncoder(dropLast=True, inputCol=col+'_idx', outputCol=col+'_ohe')

In [None]:
# df = ohe.transform(df)

In [None]:
# df.columns

In [None]:
# df.select('Gender').show()
# df.select('Gender_').show()

In [None]:
features = [col+'_ohe']
va = VectorAssembler(inputCols=features, outputCol='features')

https://spark.apache.org/docs/1.6.1/ml-guide.html#example-pipeline

In [None]:
pipeline = Pipeline(stages = [stridx, ohe, va])
model = pipeline.fit(df)
df = model.transform(df)

In [None]:
df.select('NPI', 'features').show(5)

In [None]:
# cache processed dataframe/model
df.persist() 
# df.persist(pyspark.StorageLevel.MEMORY_AND_DISK)
# df.unpersist()

### MinHash LSH example
https://janzhou.org/lsh/   
https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.MinHashLSH  
https://github.com/apache/spark/blob/master/examples/src/main/python/ml/min_hash_lsh_example.py   
https://github.com/evancasey/spark-knn-recommender/blob/master/algorithms/itemSimilarity.py  

In [3]:
data = [(0, Vectors.sparse(6, [0, 1, 2], [1.0, 1.0, 1.0]),),
        (1, Vectors.sparse(6, [2, 3, 4], [1.0, 1.0, 1.0]),),
        (2, Vectors.sparse(6, [0, 2, 4], [1.0, 1.0, 1.0]),),
        (3, Vectors.sparse(6, [1, 4, 5], [1.0, 1.0, 1.0]),),
        (4, Vectors.sparse(6, [2, 3, 5], [1.0, 1.0, 1.0]),),
        (5, Vectors.sparse(6, [1, 2, 4], [1.0, 1.0, 1.0]),)]
ex = spark.createDataFrame(data, ["id", "features"])
print(type(ex))
ex.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+---+--------------------+
| id|            features|
+---+--------------------+
|  0|(6,[0,1,2],[1.0,1...|
|  1|(6,[2,3,4],[1.0,1...|
|  2|(6,[0,2,4],[1.0,1...|
|  3|(6,[1,4,5],[1.0,1...|
|  4|(6,[2,3,5],[1.0,1...|
|  5|(6,[1,2,4],[1.0,1...|
+---+--------------------+



In [4]:
ex = ex.drop('hashes')
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=2, seed=123)
m = mh.fit(ex)
ex = m.transform(ex)
ex.show()

+---+--------------------+--------------------+
| id|            features|              hashes|
+---+--------------------+--------------------+
|  0|(6,[0,1,2],[1.0,1...|[[-6.7948028E8], ...|
|  1|(6,[2,3,4],[1.0,1...|[[-1.86843801E9],...|
|  2|(6,[0,2,4],[1.0,1...|[[-3.15433227E8],...|
|  3|(6,[1,4,5],[1.0,1...|[[-6.7948028E8], ...|
|  4|(6,[2,3,5],[1.0,1...|[[-1.86843801E9],...|
|  5|(6,[1,2,4],[1.0,1...|[[-6.7948028E8], ...|
+---+--------------------+--------------------+



https://spark.apache.org/docs/2.1.1/ml-features.html#approximate-nearest-neighbor-search

In [5]:
id=4
key = ex.filter('id = {}'.format(id)).select('id', 'features').collect()
# print(type(key[0][1])) # <class 'pyspark.ml.linalg.SparseVector'>
# print(key[0][1],'\n') # (6,[2,3,5],[1.0,1.0,1.0]) 
neighbors=2
nn = m.approxNearestNeighbors(ex, key[0][1], neighbors+1).select('id').collect()
print('Top {} most similar to {}:'.format(neighbors, id), [n[0] for n in nn[1:]])

Top 2 most similar to 4: [1, 0]


In [6]:
d = m.approxSimilarityJoin(ex, ex, .99, distCol='JaccardDistance')

In [7]:
d.show(2)

+--------------------+--------------------+---------------+
|            datasetA|            datasetB|JaccardDistance|
+--------------------+--------------------+---------------+
|[5,(6,[1,2,4],[1....|[1,WrappedArray([...|            0.5|
|[5,(6,[1,2,4],[1....|[0,WrappedArray([...|            0.5|
+--------------------+--------------------+---------------+
only showing top 2 rows



In [8]:
d = d.withColumn('id_A', d.datasetA.id)
d = d.withColumn('id_B', d.datasetB.id)
d = d.filter('id_A != id_B')
d.show(2)

+--------------------+--------------------+---------------+----+----+
|            datasetA|            datasetB|JaccardDistance|id_A|id_B|
+--------------------+--------------------+---------------+----+----+
|[5,(6,[1,2,4],[1....|[1,WrappedArray([...|            0.5|   5|   1|
|[5,(6,[1,2,4],[1....|[0,WrappedArray([...|            0.5|   5|   0|
+--------------------+--------------------+---------------+----+----+
only showing top 2 rows



In [9]:
from pyspark.sql import functions as F
# d.groupby("id_A").agg(F.collect_set("id_B"), F.collect_list("id_B",)).show()

# results = d.groupby("id_A").agg(F.collect_list(struct("id_B", "JaccardDistance")).alias("combos"))
results = d.orderBy('id_A', 'JaccardDistance').groupby("id_A")\
            .agg(F.collect_list(struct("id_B", "JaccardDistance")).alias("combos"))
results.toPandas() # nicer output than show
results.show(4, False) 
# False to show without truncating

+----+---------------------------------------------+
|id_A|combos                                       |
+----+---------------------------------------------+
|0   |[[2,0.5], [5,0.5], [4,0.8], [3,0.8], [1,0.8]]|
|5   |[[1,0.5], [0,0.5], [2,0.5], [3,0.5], [4,0.8]]|
|1   |[[4,0.5], [5,0.5], [2,0.5], [0,0.8]]         |
|3   |[[5,0.5], [0,0.8]]                           |
+----+---------------------------------------------+
only showing top 4 rows



In [10]:
# results.withColumn('combos', explode('combos')).show() # reverses groupby/agg

In [11]:
# results.withColumn('first', results.combos[0]).show(5)
results = results.rdd.map(lambda row: row + row.combos[0] + row.combos[1]).toDF()
results.show()

+---+--------------------+---+---+---+---+
| _1|                  _2| _3| _4| _5| _6|
+---+--------------------+---+---+---+---+
|  0|[[2,0.5], [5,0.5]...|  2|0.5|  5|0.5|
|  5|[[1,0.5], [0,0.5]...|  1|0.5|  0|0.5|
|  1|[[4,0.5], [5,0.5]...|  4|0.5|  5|0.5|
|  3|  [[5,0.5], [0,0.8]]|  5|0.5|  0|0.8|
|  2|[[5,0.5], [1,0.5]...|  5|0.5|  1|0.5|
|  4|[[1,0.5], [0,0.8]...|  1|0.5|  0|0.8|
+---+--------------------+---+---+---+---+



In [12]:
results.withColumnRenamed('_1', 'id').withColumnRenamed('_3', 'id1').withColumnRenamed('_5', 'id2')\
        .select('id', 'id1', 'id2').show()

+---+---+---+
| id|id1|id2|
+---+---+---+
|  0|  2|  5|
|  5|  1|  0|
|  1|  4|  5|
|  3|  5|  0|
|  2|  5|  1|
|  4|  1|  0|
+---+---+---+



In [None]:
# df_as1 = d.alias("df_as1").select('id_A', 'id_B', 'JaccardDistance')
# df_as2 = d.alias("df_as2").select('id_A', 'id_B', 'JaccardDistance')
# df_as1.join(df_as2, col("df_as1.id_A") == col("df_as2.id_B"), 'inner').show()

In [None]:
d.select('id_A', 'id_B', 'JaccardDistance').coalesce(1)\
    .write.format("com.databricks.spark.csv") .option("header", "true").save("example.csv")

### Now with NPI data...

In [5]:
# cdf = spark.read.csv('s3n://gschoolcapstone/npidata_20050523-20170813_clean.csv', \
#                      header=True, inferSchema=True)
cdf = spark.read.csv('npidata_20050523-20170813_clean.csv', \
                     header=True, inferSchema=True).limit(100)

In [6]:
rdd = cdf.rdd
npi = rdd.map(lambda x: x[0])
features = rdd.map(lambda x: x[1:])
feature_cols = cdf.columns[1:]
# feature_cols = list(feature_cols.asDict().values())[1:]

without pipeline...

In [7]:
va = VectorAssembler(inputCols=feature_cols, outputCol='features')
cdf = va.transform(cdf)
# cdf.select("NPI", "features").show(5)

In [8]:
# cdf = cdf.drop('hashes')
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=10, seed=123)
model = mh.fit(cdf)
cdf = model.transform(cdf)
# cdf.select("NPI", "features", "hashes").show(5)
# cdf.select('NPI','features', 'hashes').toPandas()

with pipeline... 

In [105]:
# stages = []
# stages.append(VectorAssembler(inputCols=feature_cols, outputCol='features'))
# hash_slices = 4
# stages.append(MinHashLSH(inputCol='features', outputCol='hashes', numHashTables=hash_slices, seed=123))

In [108]:
# cdf = cdf.drop('features', 'hashes')
# pipeline = Pipeline(stages=stages)
# model = pipeline.fit(cdf)
# cdf = model.transform(cdf)

finding similarity...

In [9]:
distances = model.approxSimilarityJoin(cdf, cdf, .5, distCol='JaccardDistance')

In [10]:
distances = distances.withColumn('NPI', distances.datasetA.NPI)
distances = distances.withColumn('NPI_similar', distances.datasetB.NPI)
distances = distances.drop('datasetA', 'datasetB')
distances.columns

['JaccardDistance', 'NPI', 'NPI_similar']

write results to csv...

In [21]:
# distances.select('NPI', 'NPI_similar', 'JaccardDistance').coalesce(1)\
#     .write.format("com.databricks.spark.csv").option("header", "true").save("s3n://gschoolcapstone/distances")

In [None]:
distances.select('NPI', 'NPI_similar', 'JaccardDistance').write.csv('distances', \
                                                                    header=True, mode='overwrite')

write results to Postgres database...

In [11]:
my_writer = DataFrameWriter(distances)

In [12]:
# format jdbc:postgresql://host:port/database
url = "jdbc:postgresql://128.177.113.102:5432/capstone"
table = "distances10000"
mode = "overwrite" # or "append"
properties = {"user":"postgres", "password":"postgres", "driver": "org.postgresql.Driver"}

In [None]:
my_writer.jdbc(url, table, mode, properties)

In [None]:
# OR
# distances.write.jdbc(url=url, table="similarity", mode=mode, properties=properties)

viewing results in notebook...

In [136]:
# distances.orderBy('NPI', 'JaccardDistance').toPandas()

In [135]:
# distances.orderBy('NPI', 'JaccardDistance').filter('NPI != NPI_similar').toPandas()

In [17]:
distances.createOrReplaceTempView("sim")
# spark.sql("SELECT NPI, NPI_similar, JaccardDistance FROM sim \
#             WHERE NPI != NPI_similar \
#             ORDER BY NPI, JaccardDistance").show()

In [18]:
spark.sql("SELECT NPI, NPI_similar, JaccardDistance FROM sim \
            WHERE NPI != NPI_similar AND NPI == 1215930367 \
            ORDER BY NPI, JaccardDistance \
            LIMIT 20").show()

# 1215930367 - 1306849450 (.375), 1588667638 (.555), 1679576722 (.666)
# 1578547329

+----------+-----------+------------------+
|       NPI|NPI_similar|   JaccardDistance|
+----------+-----------+------------------+
|1215930367| 1215930268|               0.0|
|1215930367| 1851394886|0.1428571428571429|
|1215930367| 1629071667|0.1428571428571429|
|1215930367| 1336142553|              0.25|
|1215930367| 1558364588|              0.25|
|1215930367| 1326041500|0.2857142857142857|
|1215930367| 1013910272|             0.375|
|1215930367| 1316940588|             0.375|
|1215930367| 1992708168|             0.375|
|1215930367| 1003819269|             0.375|
|1215930367| 1699778860|             0.375|
|1215930367| 1174526347|             0.375|
|1215930367| 1740283738|             0.375|
|1215930367| 1740283753|             0.375|
|1215930367| 1881697894|             0.375|
|1215930367| 1871596098|             0.375|
|1215930367| 1710980941|             0.375|
|1215930367| 1124021134|             0.375|
|1215930367| 1114920238|             0.375|
|1215930367| 1295738219|        

In [14]:
npi=1215930367
key = cdf.filter('NPI = {}'.format(npi)).select('NPI', 'features').collect()
# print(type(key[0][1])) # <class 'pyspark.ml.linalg.SparseVector'>
# print(key[0][1],'\n') # (6,[2,3,5],[1.0,1.0,1.0]) 

In [15]:
neighbors=20
nn = model.approxNearestNeighbors(cdf, key[0][1], neighbors+1, distCol='NeighborDistance').select('NPI').collect()
print('Top {} most similar to {}:'.format(neighbors, npi), [n[0] for n in nn[1:]])

Top 20 most similar to 1215930367: [1952305591, 1215930268, 1932140159, 1235168808, 1962406462, 1215930367, 1669612123, 1639278559, 1972608594, 1639273956, 1356383103, 1114020922, 1164529806, 1609972801, 1700980299, 1821195132, 1184723827, 1376641308, 1750480950, 1194829267]


### MLlib Similarity Matrix (depracated)
https://spark.apache.org/docs/2.1.1/api/python/pyspark.mllib.html#pyspark.mllib.linalg.distributed.RowMatrix

In [None]:
row_rdd = sc.parallelize([[1, 2], [1, 5]])
# RowMatrix(rdd, numRows=0, numCols=0)
mat = RowMatrix(row_rdd)
sims = mat.columnSimilarities()
sims.entries.first().value

In [None]:
# len(features.take(1)[0]) # 938
features.top(2)

In [None]:
mat = RowMatrix(features, numRows=5205376, numCols=938)
sims = mat.columnSimilarities()
sims.entries.first().value