In [10]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

import sys
conf = SparkConf()
conf.set("spark.driver.memory", "100g")
conf.set("spark.executor.memory", "100g")
conf.set("spark.master", "local[30]")
conf.set("spark.driver.maxResultSize", "100g")
conf.set("spark.executor.heartbeatInterval","1000000000s")
conf.set("spark.network.timeout","1000000000s")
spark = SparkSession.builder.config(conf=conf).appName("convertProfile").getOrCreate()

from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer, CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StopWordsRemover


from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, regexp_replace
from pyspark.sql.functions import split, explode




from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer, CountVectorizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType


from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.ml.linalg import SparseVector, DenseVector
from pyspark.sql.types import *
from pyspark.sql.functions import lit, col, regexp_replace
from pyspark.sql.functions import split, explode


In [19]:
class PokeProfile():
    def __init__(self):
        """
        data can be downloaded at http://snap.stanford.edu/data/soc-Pokec.html
        or run:
        wget http://snap.stanford.edu/data/soc-pokec-relationships.txt.gz .
        wget http://snap.stanford.edu/data/soc-pokec-profiles.txt.gz .
        zcat soc-pokec-relationships.txt.gz > soc-pokec-relationships.txt
        zcat soc-pokec-profiles.txt.gz > soc-pokec-profiles.txt                        
        mkdir output # place to save output parquet
        mkdir vocabulary # place to save vocabulary (vector-word mapping)
        """
        self.userFile = "data/seminar_year.csv"
        self.userFile = "data/policy_adoption.csv"
        
    def readProfile(self):
        """
        read the profile data
        """
        self.profiles_=spark.read.option("delimiter", ",").option("header","true").csv(self.userFile)
        self.profiles_ = self.profiles_.na.fill("")
        

    def formatHeaders(self, headers = [ "user_id","year","title"]):
        """
        provide the headers of the data frame
        """
        for c,n in zip(self.profiles_.columns,headers):
            self.profiles_ = self.profiles_.withColumnRenamed(c,n)        
            
    def tokenize(self, inputColProfile = "I_am_working_in_field", vocSize = 40, minDF = 1.0):
        """
        tokenize string column, count the occurence of words and then use the occurence of the top words as vector
        :type inputColProfile: str: column to extract the vector
        :type vocSize: int: number of words to count
        :type minDF: float: minimun document frequency of the word            
        :rtype: None

        """
        self.vocSize = vocSize        
        self.minDF = minDF
        self.inputColProfile = inputColProfile
        self.outputColProfile = "{}_words".format(self.inputColProfile)
        self.outputColProfileStop = "{}_words_stp".format(self.inputColProfile)        
        self.outputTokensColProfile = "{}_tokens".format(self.inputColProfile)
        self.outputTokensDenseColProfile = "{}_dense".format(self.inputColProfile)

        regexTokenizer = RegexTokenizer(inputCol=self.inputColProfile, outputCol=self.outputColProfile, pattern="\\W|\\d")
        self.profiles_ = regexTokenizer.transform(self.profiles_)

        remover = StopWordsRemover(inputCol=self.outputColProfile, outputCol=self.outputColProfileStop)
        self.profiles_ = remover.transform(self.profiles_)

        self.cv = CountVectorizer(inputCol=self.outputColProfileStop, 
                             outputCol=self.outputTokensColProfile, 
                             vocabSize=self.vocSize, minDF=self.minDF)
        try:
            self.model = self.cv.fit(self.profiles_)        
            self.profiles_ = self.model.transform(self.profiles_)        
            vector_udf = udf(lambda vector: vector.toArray().tolist(),ArrayType(DoubleType()))

            self.profiles_ = self.profiles_.withColumn(self.outputTokensDenseColProfile, vector_udf(self.outputTokensColProfile))
            self.profiles_ = self.profiles_.drop(self.inputColProfile)
            self.profiles_ = self.profiles_.drop(self.outputColProfile)
            self.profiles_ = self.profiles_.drop(self.outputColProfileStop)            
            self.profiles_ = self.profiles_.drop(self.outputTokensColProfile)
        except:
            print("Tokenizing {} Failed".format(self.inputColProfile))
            self.profiles_ = self.profiles_.drop(self.outputColProfile)
            
    def flattenVectorColumns(self, selected_columns = ["user_id", "year"]):
        """
        convert from 
            col1=[0,1,2], col2=[0,1,2], col3=3, col4=0
            to
            col1.0,col1.1,col1.2,col2.0, col2.1,col2.2, col3, col4
        """
        self.selected_columns = selected_columns
        stringColumns = self.listStringColumns(self.index, self.cnt_each, all_columns = self.all_columns)        
        stringColumns = [column + "_dense" for column in stringColumns]
        self.newColumns = [self.profiles_[column][i] for column in stringColumns for i in range(self.vocSize)]
        self.nonstringColumns = [column for column in self.profiles_.columns if column not in stringColumns]
        self.profiles_flatten = self.profiles_.select(self.selected_columns + self.newColumns)
#         self.profiles_flatten = self.profiles_.select(self.nonstringColumns + self.newColumns)        
#         self.profiles_flatten = self.profiles_flatten.drop("_c59")

    def saveVocabulary(self):
        """
        save the vocabulary to a separate file; 
        vocabulary can work as a look up table for the word given the index in the word vector        
        """
        import pandas as pd
        pd.DataFrame(self.model.vocabulary).to_csv("data/vocabulary/{}.txt".format(self.inputColProfile), 
                                                   sep='\t', encoding='utf-8', header=False)
        
    def listStringColumns(self, index = 0, cnt_each = 10, all_columns = ["title"]):
        """
        list of string columns in the data
        """
        self.index = index
        self.cnt_each = cnt_each
        self.all_columns = all_columns
        # "pets", "completed_level_of_education", ""
#         all_columns = ["title"]
        self.cnt_string = len(all_columns) 
        start, end = index*cnt_each, (index+1)*cnt_each
        return all_columns[start:end]
    

    def saveOutput(self, data, outputfile = "soc-pokec-profiles-vector", save_format = "parquet"):
        """
        save data as parquet
        """
        if save_format == "parquet":
            data.repartition(1).write.parquet("{}.parquet".format(outputfile))
        else:
            data.repartition(1).write.csv("{}.csv".format(outputfile))

from pyspark.sql.functions import array, col, explode, struct, lit

df = sc.parallelize([(1, 0.0, 0.6), (1, 0.6, 0.7)]).toDF(["A", "col_1", "col_2"])

def to_long(df, by):

    # Filter dtypes and split into column names and type description
    cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
    # Spark SQL supports only homogeneous columns
    assert len(set(dtypes)) == 1, "All columns have to be of the same type"

    # Create and explode an array of (column_name, column_value) structs
    kvs = explode(array([
      struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
    ])).alias("kvs")

    return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])

In [73]:
PP = PokeProfile()
PP.readProfile()
PP.formatHeaders()

In [74]:
for i in range(1):
    print(i)
    for eachColumn in PP.listStringColumns(i, cnt_each = 1):
        PP.tokenize(inputColProfile = eachColumn, vocSize = 40)
        PP.saveVocabulary()
        PP.flattenVectorColumns()

In [81]:
headers = ["user_id", "year"] + PP.model.vocabulary
for c,n in zip(PP.profiles_flatten.columns,headers):
    PP.profiles_flatten = PP.profiles_flatten.withColumnRenamed(c,n)        


In [82]:
PP.profiles_flatten = to_long(PP.profiles_flatten, ["user_id", "year"])
PP.profiles_flatten = PP.profiles_flatten.repartition(1)
PP.profiles_flatten.write.csv("data/scholar_top_{}.csv".format(PP.vocSize), header = False, mode="overwrite")

[Row(user_id=u'Ang', year=u'2018', networks=0.0, social=0.0, anomaly=0.0, twitter=0.0, dynamic=0.0, academic=0.0, hypersphere=0.0, conferences=0.0, detection=0.0, mobile=0.0, analysis=0.0, media=0.0, modeling=0.0, information=0.0, event=0.0, sentiment=0.0, unsupervised=0.0, beyond=1.0, robust=0.0, process=0.0, scale=0.0, march=1.0, feet=1.0, embedding=0.0, effect=0.0, discovery=0.0, method=0.0, protests=1.0, visual=0.0, seeking=0.0, data=0.0, role=0.0, large=0.0, analytics=0.0, talking=1.0, events=0.0, time=0.0, streaming=0.0, deep=0.0, regression=0.0),
 Row(user_id=u'Ang', year=u'2018', networks=0.0, social=0.0, anomaly=0.0, twitter=0.0, dynamic=0.0, academic=0.0, hypersphere=0.0, conferences=0.0, detection=0.0, mobile=0.0, analysis=0.0, media=0.0, modeling=0.0, information=0.0, event=0.0, sentiment=0.0, unsupervised=0.0, beyond=0.0, robust=0.0, process=0.0, scale=0.0, march=0.0, feet=0.0, embedding=0.0, effect=0.0, discovery=0.0, method=0.0, protests=0.0, visual=0.0, seeking=0.0, dat

In [63]:
PP = PokeProfile()
PP.readProfile()
PP.formatHeaders(headers=["policy_id","policy_name","policy_subject_id","policy_start","policy_end","policy_description","policy_lda_1","policy_lda_2","policy_lda_3"])

In [64]:
PP.profiles_ = PP.profiles_.filter(PP.profiles_.policy_start >= 1970)

In [65]:
PP.profiles_.head(3)

[Row(policy_id=u'aborparc', policy_name=u'1-parent Consent for Abortion by a Minor', policy_subject_id=u'12', policy_start=u'1981', policy_end=u'1999', policy_description=u'1-parent Consent for Abortion by a Minor ', policy_lda_1=u'19', policy_lda_2=u'NULL', policy_lda_3=u'NULL'),
 Row(policy_id=u'aborparn', policy_name=u'1-parent Notification for Abortion by a Minor', policy_subject_id=u'12', policy_start=u'1981', policy_end=u'2000', policy_description=u'1-parent Notification for Abortion by a Minor ', policy_lda_1=u'19', policy_lda_2=u'NULL', policy_lda_3=u'NULL'),
 Row(policy_id=u'abortion_consent_1973_199', policy_name=u'Does The State Mandate Counseling Before An Abortion (Pre-Casey)?', policy_subject_id=u'2', policy_start=u'1973', policy_end=u'1991', policy_description=u'Does The State Mandate Counseling Before An Abortion (Pre-Casey)?', policy_lda_1=u'19', policy_lda_2=u'NULL', policy_lda_3=u'NULL')]

In [66]:
PP.listStringColumns(i, cnt_each = 1, all_columns = ["policy_name"])

['policy_name']

In [67]:
for i in range(1):
    print(i)
    for eachColumn in PP.listStringColumns(i, cnt_each = 1, all_columns = ["policy_name"]):
        PP.tokenize(inputColProfile = eachColumn, vocSize = 40)
        PP.saveVocabulary()
        PP.flattenVectorColumns(selected_columns = ["policy_start", "policy_end", "policy_id"])

0


In [68]:
headers = PP.selected_columns + PP.model.vocabulary
for c,n in zip(PP.profiles_flatten.columns,headers):
    PP.profiles_flatten = PP.profiles_flatten.withColumnRenamed(c,n)        


In [69]:
PP.profiles_flatten.head(1)

[Row(policy_start=u'1981', policy_end=u'1999', policy_id=u'aborparc', state=0.0, law=0.0, provides=0.0, ban=0.0, abortion=1.0, states=0.0, property=0.0, public=0.0, sex=0.0, regulates=0.0, child=0.0, insurance=0.0, laws=0.0, establishes=0.0, agreement=0.0, requires=0.0, management=0.0, notification=0.0, planning=0.0, tax=0.0, governs=0.0, credit=0.0, allows=0.0, system=0.0, legal=0.0, allow=0.0, limits=0.0, provide=0.0, coverage=0.0, level=0.0, offenders=0.0, use=0.0, abortions=0.0, health=0.0, electronic=0.0, real=0.0, act=0.0, rules=0.0, regulating=0.0, framework=0.0)]

In [70]:
PP.profiles_flatten = to_long(PP.profiles_flatten, PP.selected_columns)
PP.profiles_flatten = PP.profiles_flatten.repartition(1)
# PP.profiles_flatten.write.csv("data/policy_top_{}.csv".format(PP.vocSize), header = False, mode="overwrite")

In [71]:
PP.profiles_flatten.head(3)

[Row(policy_start=u'1981', policy_end=u'1999', policy_id=u'aborparc', key=u'state', val=0.0),
 Row(policy_start=u'1981', policy_end=u'1999', policy_id=u'aborparc', key=u'law', val=0.0),
 Row(policy_start=u'1981', policy_end=u'1999', policy_id=u'aborparc', key=u'provides', val=0.0)]

In [72]:
PP.profiles_state=spark.read.option("delimiter", ",").option("header","true").csv("data/policy_adoption_state.csv")
PP.profiles_state = PP.profiles_state.na.fill("")


In [73]:
PP.profiles_state.head(3)

[Row(policy_id=u'corporateeff_yearadopted', adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics'),
 Row(policy_id=u'corporateeff_yearadopted', adopted_year=u'1990', state_id=u'MA', subject_name=u'Macroeconomics'),
 Row(policy_id=u'corporateeff_yearadopted', adopted_year=u'1990', state_id=u'NC', subject_name=u'Macroeconomics')]

In [80]:
PP.profiles_all = PP.profiles_state.join(PP.profiles_flatten, PP.profiles_state.policy_id == PP.profiles_flatten.policy_id)\
                    .select(PP.profiles_state["*"],PP.profiles_flatten["*"]).drop(PP.profiles_state.policy_id)

In [81]:
PP.profiles_all.head(5)

[Row(adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics', policy_start=u'1990', policy_end=u'2008', policy_id=u'corporateeff_yearadopted', key=u'framework', val=0.0),
 Row(adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics', policy_start=u'1990', policy_end=u'2008', policy_id=u'corporateeff_yearadopted', key=u'regulating', val=0.0),
 Row(adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics', policy_start=u'1990', policy_end=u'2008', policy_id=u'corporateeff_yearadopted', key=u'rules', val=0.0),
 Row(adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics', policy_start=u'1990', policy_end=u'2008', policy_id=u'corporateeff_yearadopted', key=u'act', val=0.0),
 Row(adopted_year=u'1990', state_id=u'HI', subject_name=u'Macroeconomics', policy_start=u'1990', policy_end=u'2008', policy_id=u'corporateeff_yearadopted', key=u'real', val=0.0)]

In [83]:
PP.profiles_all = PP.profiles_all.repartition(1)
PP.profiles_all.write.csv("data/policy_top_{}.csv".format(PP.vocSize), header = True, mode="overwrite")

In [96]:
# import pandas as pd
# self = PP
# picso = pd.read_csv("data/picso.csv", header=None)
# picso.columns = ['member', 'year', 'keyword', 'value']
# # policy_group = policy.groupby(self.column)['adoption'].sum()
# # policy_group1 = policy_group.unstack(fill_value=0).to_panel()
# # self.hist = policy_group1.fillna(0).values