In [None]:
from time import time
from pyspark import SparkContext
for j in range(1,10):
    sc = SparkContext(master=f'local[{j}]')
    t0 = time()
    for i in range(10):
        sc.parallelize([1,2] * 1000000).reduce(lambda x,y:x+y)
    print(f'{j} executors, time= {time() - t0}')
    sc.stop()

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.functions import avg, col, expr, collect_list, concat, lit, regexp_replace, round, split, struct, udf, UserDefinedFunction, substring, length  
from pyspark.sql.types import StringType, IntegerType, FloatType, ArrayType, StructType, StructField

In [3]:
spark = SparkSession.builder.master("local[*]").appName('analytics-commie').getOrCreate()

allIndivSchema = StructType([
    StructField("CMTE_ID", StringType(), False),
    StructField("AMNDT_IND", StringType(), True),
    StructField("RPT_TP", StringType(), True),
    StructField("TRANSACTION_PGI", StringType(), True),
    StructField("IMAGE_NUM", StringType(), True),
    StructField("TRANSACTION_TP", StringType(), True),
    StructField("ENTITY_TP", StringType(), True),
    StructField("NAME", StringType(), True),
    StructField("CITY", StringType(), True),
    StructField("STATE", StringType(), True),
    StructField("ZIP_CODE", StringType(), True),
    StructField("EMPLOYER", StringType(), True),
    StructField("OCCUPATION", StringType(), True),
    StructField("TRANSACTION_DT", IntegerType(), True),
    StructField("TRANSACTION_AMT", IntegerType(), True),
    StructField("OTHER_ID", StringType(), True),
    StructField("TRAN_ID", StringType(), True),
    StructField("FILE_NUM", IntegerType(), True),
    StructField("MEMO_CD", StringType(), True),
    StructField("MEMO_TEXT", StringType(), True),
    StructField("SUB_ID", StringType(), True)
])

allIndiv = spark.read.format("csv").options(delimiter='|',header='true').schema(allIndivSchema).load("data/indiv_cont/itcon*.txt")
commToCand = spark.read.format("csv").options(delimiter='|',inferschema='true',header='true').load('data/itpas2.txt')
candMaster = spark.read.format("csv").options(delimiter='|',inferschema='true',header='true').load('data/cn.txt')
commmieMaster = spark.read.format("csv").options(delimiter='|',inferschema='true',header='true').load('data/cm.txt')


                                                                                

In [6]:
allIndiv = allIndiv.drop("AMNDT_IND","RPT_TP","TRANSACTION_PGI","IMAGE_NUM","TRANSACTION_TP","CITY","EMPLOYER","OCCUPATION","TRANSACTION_DT","TRANSACTION_TP","TRAN_ID","FILE_NUM","MEMO_CD","MEMO_TEXT","SUB_ID")
commToCand = commToCand.drop("AMNDT_IND","RPT_TP","TRANSACTION_PGI","IMAGE_NUM","TRANSACTION_TP","NAME","CITY","EMPLOYER","OCCUPATION","TRANSACTION_DT","TRANSACTION_TP","TRAN_ID","FILE_NUM","MEMO_CD","MEMO_TEXT","SUB_ID")
candMaster = candMaster.drop("CAND_ELECTION_YR","CAND_OFFICE_ST","CAND_OFFICE","CAND_OFFICE_DISTRICT","CAND_ICI","CAND_STATUS","CAND_ST1","CAND_ST2","CAND_CITY","CAND_ST","CAND_ZIP")
commieMaster = commmieMaster.drop("TRES_NM", "CMTE_ST1","CMTE_ST2", "CMTE_CITY", "CMTE_DSGN", "CMTE_FILING_FREQ", "CONNECTED_ORG_NM")

In [None]:
allIndiv.createOrReplaceTempView("indiv")
allIndiv = spark.sql(""" SELECT * FROM indiv WHERE (ENTITY_TP == "COM" OR ENTITY_TP == "PTY" OR ENTITY_TP == "PAC" OR ENTITY_TP == "ORG") """)
allIndiv.createOrReplaceTempView("allInd")


In [8]:
commToCand.createOrReplaceTempView("allCom")
candMaster.createOrReplaceTempView("candMast")
commieMaster.createOrReplaceTempView("commieMast")

In [None]:
getcandid = spark.sql(""" SELECT * FROM allInd i, commieMast c WHERE i.CMTE_ID = c.CMTE_ID""")
getcandid = getcandid.drop("CMTE_ST", "CMTE_ZIP", "CMTE_PTY_AFFILIATION", "CMTE_TP", "ORG_TP")

In [10]:
getcandid.createOrReplaceTempView("testcand")

In [11]:
#ComtoCand -> other_id receiver, cmte id the donor, name the receiver 
commToCand = commToCand.withColumnRenamed("CMTE_ID","CMTEID_DONOR")
commToCand = commToCand.withColumnRenamed("OTHER_ID","CMTEID_RECEIVER")


In [None]:
commtbl = spark.sql("""select a.CMTE_ID AS CMTID_DONOR, ENTITY_TP, STATE, ZIP_CODE, 
OTHER_ID AS CMTID_RECEIVER, 
a.CAND_ID AS CAND_ID1 
from allCom a, commieMast b 
where a.OTHER_ID == b.CMTE_ID""") 

In [None]:
st1 = spark.sql("select * from allCom c, commieMast m where c.OTHER_ID == m.CMTE_ID") 
st1 = st1.drop("ORG_TP", "OTHER_ID", "CMTE_ZIP", "CMTE_ST", "CMTE_TP")

In [16]:
commtbl.createOrReplaceTempView("cmttbl")

In [None]:
st1 = spark.sql("select * from cmttbl a, commieMast b where a.CMTID_DONOR == b.CMTE_ID") 
st1 = st1.drop("CMTE_ST", "CMTE_TP", "CMTE_ID", "CMTE_TP","CMTE_ZIP","ORG_TP","CAND_ID")
st1.createOrReplaceTempView("cmte_tbl")

In [None]:
cmt_tbl = spark.sql(""" SELECT CAND_ID1 AS CAND_ID, ENTITY_TP, CMTE_NM, CMTID_DONOR, CMTID_RECEIVER, SUM(TRANSACTION_AMT) AS TOTAL 
FROM cmte_tbl WHERE CAND_ID1 IS NOT NULL GROUP BY CAND_ID1, CMTID_RECEIVER, ENTITY_TP, CMTE_NM, CMTID_DONOR ORDER BY TOTAL DESC""")
cmt_tbl.createOrReplaceTempView("cmt_tbl")
cmt_tbl_ll = spark.sql(""" SELECT ZIP_CODE, CAND_ID1 AS CAND_ID, SUM(TRANSACTION_AMT) AS TOTAL 
FROM cmte_tbl WHERE CAND_ID1 IS NOT NULL GROUP BY ZIP_CODE, CAND_ID ORDER BY CAND_ID DESC""")
cmt_tbl_ll.createOrReplaceTempView("cmt_tbl_ll")


In [None]:
st8 = spark.sql("""select CAN, CMTID_RECEIVER, CMTID_DONOR, ENTITY_TP, STATE, ZIP_CODE, 
                SUM(TRANSACTION_AMT) AS TRANSACTION_AMT, CAND_ID1, CMTE_NM, CMTE_PTY_AFFILIATION, TOTAL from cmte_tbl a, cmt_tbl b 
                where a.CAND_ID1 == b.CAN 
                GROUP BY CAN, CMTID_RECEIVER, CMTID_DONOR, ENTITY_TP, STATE, ZIP_CODE, TRANSACTION_AMT,
                CAND_ID1, CMTE_NM, CMTE_PTY_AFFILIATION, TOTAL ORDER BY CMTID_RECEIVER ASC""") 
st8 = st8.drop("CAN")
st8.createOrReplaceTempView("commie")

In [None]:
w = spark.sql(""" select SUM(TRANSACTION_AMT) AS TRANSACTION_AMT, CMTID_RECEIVER, CMTID_DONOR FROM commie GROUP BY CMTID_RECEIVER, CMTID_DONOR """)
w.createOrReplaceTempView("w")
p = spark.sql(""" select w.TRANSACTION_AMT, w.CMTID_RECEIVER, w.CMTID_DONOR, c.ENTITY_TP, c.STATE, c.CAND_ID1 AS CAND_ID, c.CMTE_NM, c.CMTE_PTY_AFFILIATION, c. from w, commie c""")


In [None]:
l2 = spark.sql(""" Select * from commie where cmtid_donor = "C00694323" ORDER BY CMTID_RECEIVER ASC """)

In [None]:
cmt_tbl_ll = cmt_tbl_ll.withColumn('ZIP_CODE', cmt_tbl_ll['ZIP_CODE'].substr(1, 5))

In [223]:
from geopy.geocoders       import GoogleV3
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types     import FloatType, ArrayType

In [224]:
geolocator = GoogleV3(api_key="AIzaSyC0GaU8jaB9fesUuG7sAQz07g-R5ZRBP7Q")


In [225]:
zipc = cmt_tbl_ll.withColumn("long_lat", lit(None))

In [226]:
zipc.createOrReplaceTempView("zipc")

In [None]:
zipcode = zipc.select('ZIP_CODE').distinct()
rows = zipcode.count()

In [None]:
coords=zipcode.rdd.map(lambda x: x[0]).collect()

In [86]:
y=0
list = []
for row in coords:
  
    try:
        location = geolocator.geocode(coords[y])
        result = str((location.latitude, location.longitude))
        list.append(result)
        coordinates = spark.createDataFrame([(value,) for value in list], ['id'])
  
    except:
        result = ""
    y+=1
    if y == 2814:
        break

In [230]:
coordinates.createOrReplaceTempView("coordinates")

In [231]:
zip_ll=coordinates.rdd.map(lambda x: x[0]).collect()

In [None]:
zip_ll = spark.createDataFrame(zip(coords, zip_ll), schema=['zip', 'll'])

In [233]:
zip_ll.createOrReplaceTempView("zip_ll")

In [None]:
combine = spark.sql(""" SELECT * from zipc, zip_ll WHERE zipc.ZIP_CODE == zip_ll.zip""")
combine = combine.drop("long_lat","zip")
combine.createOrReplaceTempView("commie")

In [251]:
#committee tbl
ID = spark.sql("""SELECT DISTINCT CAND_ID FROM cmt_tbl""")

tbl1 = spark.sql("""SELECT ENTITY_TP,
TOTAL, CMTID_DONOR, CAND_ID, CMTE_NM FROM cmt_tbl ORDER BY TOTAL DESC""")

commietbl = ID.join(tbl1.groupby("CAND_ID").agg(collect_list(struct(tbl1.CMTID_DONOR,tbl1.ENTITY_TP,tbl1.TOTAL,tbl1.CMTE_NM)).alias("comtbl")),"CAND_ID","outer")

In [252]:
#ll tbl
ID = spark.sql("""SELECT DISTINCT CAND_ID FROM commie""")

tbl1 = spark.sql("""SELECT CAND_ID, ZIP_CODE, ll, TOTAL FROM commie ORDER BY TOTAL DESC""")

commietbl_ll = ID.join(tbl1.groupby("CAND_ID").agg(collect_list(struct(tbl1.ZIP_CODE,tbl1.ll,tbl1.TOTAL)).alias("comtbl_ll")),"CAND_ID","outer")

In [257]:
commietbl_ll.createOrReplaceTempView("commietbl_ll")
commietbl.createOrReplaceTempView("commietbl")


In [258]:
joined = spark.sql("""SELECT x.CAND_ID, x.comtbl, y.comtbl_ll FROM commietbl x, commietbl_ll y WHERE y.CAND_ID = x.CAND_ID """)

In [None]:
commietbl.createOrReplaceTempView("e")
nev = spark.sql(""" SELECT * from e, candMast y WHERE e.CAND_ID1 == y.CAND_ID""")
nev = nev.drop("comtbl", "CAND_ID")
nev = nev.withColumnRenamed("CAND_ID1","CAND_ID")
nev = nev.withColumnRenamed("TOTAL","COMMDON_TOTAL")
nev.createOrReplaceTempView("commtotalcont")


In [None]:
commietbl.coalesce(1).write.format('json').save('output_commie_tbl')

In [None]:
commietbl_ll.coalesce(1).write.format('json').save('output_commie_tbl_ll')

In [None]:
joined.coalesce(1).write.format('json').save('output_commie_joined')