In [3]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("appName").master("local[*]").getOrCreate()
#from pyspark.sql import Row
from pyspark.sql import HiveContext
sqlc = HiveContext(spark)

In [74]:
# hive 与 dataframe 之间的转换
df = spark.createDataFrame([(1,144.5,5.9,33,'罗'),(2,167.2,5.4,45,'站'),(3,124.1,5.2,23,'神'),(4,144.5,5.9,33,'m'),(5,133.2,5.7,54,'f'),(3,124.1,5.2,23,'f'),(5,129.2,5.2,23,'m')],['id','weight','height','age','gender'])

# 在hive 中创建相关的表。 
sqlc.sql("use lk")
sqlc.sql("drop table if exists sxml_1013")
sqlc.sql("create table sxml_1013(id int,weight float,height float,age int,gender string ) row format delimited fields terminated by '\t' lines terminated by '\n'")

#然后将dataframe 的数据导入到hive中
df.write.insertInto("sxml_1013")  #成功

# 从hive 中拿出来的数据是 dataframe型的数据。
td = sqlc.sql("select * from sxml_1013 limit 2")
td.show() 

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     罗|
|  2| 167.2|   5.4| 45|     站|
+---+------+------+---+------+



In [82]:
'''text_file = sc.textFile("hdfs://...")
counts = text_file.flatMap(lambda line: line.split(" ")) \
             .map(lambda word: (word, 1)) \
             .reduceByKey(lambda a, b: a + b)
counts.saveAsTextFile("hdfs://...")'''

# sc =spark.sparkContext
from pyspark.sql.types import *

# 从hdfs中拿数据 回到hive中 或者hdfs中去
data = sc.textFile("hdfs://luokuideMacBook-Pro.local:8020/user/hive/warehouse/lk.db/sxml_1013")
data = data.map(lambda r:r.split('\t'))# sc 拿的是rdd

# 对数据进行处理之后 转换成 dataframe
names="id weight height age gender"
fields = [StructField(field_name, StringType(), True) for field_name in names.split()]
schema = StructType(fields)
data = spark.createDataFrame(data,schema)


'''与上面等价但是更灵活
import pyspark.sql.types as typ
labelx = [('INFANT_ALIVE_AT_REPORT',typ.IntegerType()),
        ('BIRTH_PLACE',typ.StringType()),
         ('MOTHER_WEIGHT_GAIN',typ.FloatType()),
         ('PREV_BIRTH_PRETERM',typ.IntegerType())]
shema = typ.StructType([typ.StructField(e[0],e[1],False) for e in labelx])'''


# 
data.createOrReplaceTempView("people")
tempdata = spark.sql('select * from people where gender="m"')
tempdata.show() # 也是dataframe 型的

#将 sql处理好的数据导入hive，然后hdfs中也会有类似的数据。
sqlc.sql("drop table if exists u_data_416")
sqlc.sql("CREATE TABLE u_data_416 \
               (id INT,weight float,height float,age float,gender string) \
               ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'")
tempdata.write.insertInto("u_data_416")


# 将detaframe 转换成rdd 之后再存入 hdfs中去就可以了。但是hive中是没有数据的
temp = tempdata.rdd.map(lambda row:[e for e in row])
#temp.take(2)
temp.saveAsTextFile("hdfs://luokuideMacBook-Pro.local:8020/user/hive/warehouse/lk.db/data_416_test")

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  4| 144.5|   5.9| 33|     m|
|  5| 129.2|   5.2| 23|     m|
+---+------+------+---+------+



In [None]:
# global_temp的用法
df.createGlobalTempView("people")
# Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
spark.newSession().sql("SELECT * FROM global_temp.people").show()

# 对 rdd数据的查询
from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt") #外部表
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1]))) #注意列名

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")

teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
    print(name)

In [None]:
读取其他格式的数据集

df = spark.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

df = spark.read.load("examples/src/main/resources/people.json", format="json")
df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

df = spark.read.load("examples/src/main/resources/people.csv",
                     format="csv", sep=":", inferSchema="true", header="true")

df = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")


import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))
# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)
# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select("*").toPandas()


import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# Declare the function and create the UDF
def multiply_func(a, b):
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local Pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()


from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(substract_mean).show()

In [85]:
data = sc.textFile("hdfs://luokuideMacBook-Pro.local:8020 \
                   /user/hive/warehouse/lk.db/sxml_1013",fieldDelim='\t')
data.take(2)


TypeError: textFile() got an unexpected keyword argument 'fieldDelim'

In [2]:
print('count of rows:{0}'.format(df.count()))
#show 和 take 在dataframe 上用法的差异
#注意产看是否有重复行的数据
print('count of distinct rows{0}'.format(df.distinct().count()))
# 进行去重
df =df.dropDuplicates() 

print("Count if ids:{0}".format(df.select([c for c in df.columns if c!='id']).distinct().count()))

count of rows:7
+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  1| 144.5|   5.9| 33|     m|
|  2| 167.2|   5.4| 45|     m|
|  3| 124.1|   5.2| 23|     f|
|  4| 144.5|   5.9| 33|     m|
|  5| 133.2|   5.7| 54|     f|
|  3| 124.1|   5.2| 23|     f|
|  5| 129.2|   5.2| 23|     m|
+---+------+------+---+------+

None
[Row(id=1, weight=144.5, height=5.9, age=33, gender='m'), Row(id=2, weight=167.2, height=5.4, age=45, gender='m')]


In [110]:
df1 = df.select([c for c in df.columns if c!='id']) # 如何筛选列数据？
df11 = df.select([c for c in df.columns if 'age' and 'id' not in c])  
# 注意这里的用发 ，巧妙啊 利用 and 可以减少个数 and 和& 在不同场景下的使用
df12 = df.select([c for c in df.columns if 'e' not in c])  #注意这里的用法
#df2 = df1.distinct()

In [112]:
df3 = df.dropDuplicates(subset=[c for c in df.columns if 'id' not in c]) # 利用subset 进行筛选 筛选，行的方法
# 根据条件去重。
df3.where('height>5 and weight<142').show() # 查看 符合某种条件的 数据 满足多种条件的筛选方法 
# where 的用发
# df_miss.where('id<2 or id>2').show()
#df3.show()

+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  3| 124.1|   5.2| 23|     f|
|  5| 129.2|   5.2| 23|     m|
|  5| 133.2|   5.7| 54|     f|
+---+------+------+---+------+



In [113]:
import pyspark.sql.functions as fn
df3.agg(fn.count('id').alias('count'),fn.countDistinct('id').alias('distinct')).show()  # countDistinct
# functions ，cout() ,alias() 主要是agg的用法 ，这里是合一
df3.select('id').count()  #这里不需要用show() 注意与上面方法的差异   

+-----+--------+
|count|distinct|
+-----+--------+
|    5|       4|
+-----+--------+



5

In [114]:
import numpy as np
import pandas as pd
#df3.withColumn('new_id',fn.monotonically_increasing_id()).show() #应该用处不大
df3.show()
b = spark.createDataFrame([(1,2,3,4,5),(2,3,4,5,6)],['b','c']) #为什么只能创建行呢？ 列呢？
c = spark.createDataFrame([([e]) for e in range(5)],['c'])  # 如何创建列ne ? 【 ([e]) for e in range(5)】
#print(c.show())
#df3.withColumn('n_id',pd.DataFrame(a)).show()  #那 如何添加一列数据那边哦？
# df4 = pd.concat([df3,c],axis=1)
#df4.show()
print(c.show())
# 如何查看矩阵维度
#print(df3.describe)
#print(df3.count()) print(len(df3.first())) 产看dataframe的 维度的方法 count() len(df.first())
# df3.withColumn('c',c).show() #追加 一列失败


+---+------+------+---+------+
| id|weight|height|age|gender|
+---+------+------+---+------+
|  3| 124.1|   5.2| 23|     f|
|  5| 129.2|   5.2| 23|     m|
|  4| 144.5|   5.9| 33|     m|
|  5| 133.2|   5.7| 54|     f|
|  2| 167.2|   5.4| 45|     m|
+---+------+------+---+------+

+---+
|  c|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+

None


In [115]:
# 如何 查看数据的 矩阵呢？
df_miss = spark.createDataFrame([(1,None,1,None,'m',1000),(2,3,33,1,'f',None),(3,4,None,5,'m',200)],['id','a','b','c','gen','d'])

In [116]:
df_miss.show()

+---+----+----+----+---+----+
| id|   a|   b|   c|gen|   d|
+---+----+----+----+---+----+
|  1|null|   1|null|  m|1000|
|  2|   3|  33|   1|  f|null|
|  3|   4|null|   5|  m| 200|
+---+----+----+----+---+----+



In [117]:
df_miss.rdd.map(lambda row : ( row ['id'],sum([c==None for c in row]))).collect() #如何合并数据呢？
# 计算每行中null的个数

[(1, 2), (2, 1), (3, 1)]

In [118]:
df_miss.where('b is null').show() # 查看 符合某种条件的 数据 注意这里的用法 a is null
# df_miss.where('id<2 or id>2').show()

+---+---+----+---+---+---+
| id|  a|   b|  c|gen|  d|
+---+---+----+---+---+---+
|  3|  4|null|  5|  m|200|
+---+---+----+---+---+---+



In [119]:
df_miss.agg(*[(1-(fn.count(c)/fn.count('*'))).alias(c+'_missing') for c in df_miss.columns]).show() #xing hao
print(df_miss.agg(*[fn.countDistinct(c) for c in df_miss.columns]).show())
# 注意 fn.countDistinct(c) 的用法  df.distinct().count() 的区别与联系。

+----------+-------------------+-------------------+-------------------+-----------+-------------------+
|id_missing|          a_missing|          b_missing|          c_missing|gen_missing|          d_missing|
+----------+-------------------+-------------------+-------------------+-----------+-------------------+
|       0.0|0.33333333333333337|0.33333333333333337|0.33333333333333337|        0.0|0.33333333333333337|
+----------+-------------------+-------------------+-------------------+-----------+-------------------+

+------------------+-----------------+-----------------+-----------------+-------------------+-----------------+
|count(DISTINCT id)|count(DISTINCT a)|count(DISTINCT b)|count(DISTINCT c)|count(DISTINCT gen)|count(DISTINCT d)|
+------------------+-----------------+-----------------+-----------------+-------------------+-----------------+
|                 3|                2|                2|                2|                  2|                2|
+------------------+--

In [120]:
df_miss.dropna(thresh=1).show()  # thresh 没有用嘛？
df_miss.show()

+---+----+----+----+---+----+
| id|   a|   b|   c|gen|   d|
+---+----+----+----+---+----+
|  1|null|   1|null|  m|1000|
|  2|   3|  33|   1|  f|null|
|  3|   4|null|   5|  m| 200|
+---+----+----+----+---+----+

+---+----+----+----+---+----+
| id|   a|   b|   c|gen|   d|
+---+----+----+----+---+----+
|  1|null|   1|null|  m|1000|
|  2|   3|  33|   1|  f|null|
|  3|   4|null|   5|  m| 200|
+---+----+----+----+---+----+



In [121]:
# import tensorframs as tfs

In [28]:
import pyspark.sql.types as typ
import pandas as pd
import numpy as np
from numpy import *

In [29]:
labels = [('INFANT_ALIVE_AT_REPORT',typ.StringType()),
        ('BIRTH_YEAR',typ.IntegerType()),
        ('BIRTH_PLACE',typ.StringType()),
        ('MOTHER_AGE_YEARS',typ.IntegerType()),
        ('FATHER_COMBINED_AGE',typ.IntegerType()),
        ('CIG_BEFORE',typ.StringType()),
        ('CIG_1_TRI',typ.IntegerType()),
        ('CIG_2_TRI',typ.IntegerType()),
        ('CIG_3_TRI',typ.IntegerType()),
         ('MOTHER_HEIGHT_IN',typ.FloatType()),
         ('MOTHER_PRE_WEIGHT',typ.FloatType()),
         ('MOTHER_DELIVERY_WEIGHT',typ.FloatType()),
         ('MOTHER_WEIGHT_GAIN',typ.FloatType()),
         ('DIABETES_PRE',typ.FloatType()),
         ('DIABETES_GEST',typ.FloatType()),
         ('HYP_TENS_PRE',typ.FloatType()),
         ('HYP_TENS_GEST',typ.FloatType()),
         ('PREV_BIRTH_PRETERM',typ.FloatType())]
print(labels)
schema=typ.StructType([typ.StructField(e[0],e[1],False) for e in labels])
print(schema)

[('INFANT_ALIVE_AT_REPORT', StringType), ('BIRTH_YEAR', IntegerType), ('BIRTH_PLACE', StringType), ('MOTHER_AGE_YEARS', IntegerType), ('FATHER_COMBINED_AGE', IntegerType), ('CIG_BEFORE', StringType), ('CIG_1_TRI', IntegerType), ('CIG_2_TRI', IntegerType), ('CIG_3_TRI', IntegerType), ('MOTHER_HEIGHT_IN', FloatType), ('MOTHER_PRE_WEIGHT', FloatType), ('MOTHER_DELIVERY_WEIGHT', FloatType), ('MOTHER_WEIGHT_GAIN', FloatType), ('DIABETES_PRE', FloatType), ('DIABETES_GEST', FloatType), ('HYP_TENS_PRE', FloatType), ('HYP_TENS_GEST', FloatType), ('PREV_BIRTH_PRETERM', FloatType)]
StructType(List(StructField(INFANT_ALIVE_AT_REPORT,StringType,false),StructField(BIRTH_YEAR,IntegerType,false),StructField(BIRTH_PLACE,StringType,false),StructField(MOTHER_AGE_YEARS,IntegerType,false),StructField(FATHER_COMBINED_AGE,IntegerType,false),StructField(CIG_BEFORE,StringType,false),StructField(CIG_1_TRI,IntegerType,false),StructField(CIG_2_TRI,IntegerType,false),StructField(CIG_3_TRI,IntegerType,false),Struct

In [41]:
births =spark.read.csv("file:///home/fordl/births_train.csv.gz",header=True)#,schema=schema)

In [42]:
#births.show()
recode_dictionary={
    'YUN':{
        'Y':1,
        'N':0,
        'U':0
    }
}
#print(recode_dictionary['YUN']['N'])

In [43]:
selected_features=['INFANT_ALIVE_AT_REPORT','BIRTH_YEAR','BIRTH_PLACE','MOTHER_AGE_YEARS',
                   'FATHER_COMBINED_AGE','CIG_BEFORE', 'CIG_1_TRI','CIG_2_TRI','CIG_3_TRI','MOTHER_HEIGHT_IN',
                   'MOTHER_PRE_WEIGHT','MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN',
                   'DIABETES_PRE','DIABETES_GEST','HYP_TENS_PRE','HYP_TENS_GEST','PREV_BIRTH_PRETERM']
births_trimmed = births.select(selected_features)

In [44]:
#births_trimmed.select("BIRTH_YEAR").where('BIRTH_YEAR<2015').show()
# df3.where('height>5 and weight<142').show() # 查看 符合某种条件的 数据 满足多种条件的筛选方法  多种数据的筛选方法
births_trimmed.count()  #如何进行 group by 操作

45429

In [45]:
import pyspark.sql.functions as func
def recode(col,key):
    return recode_dictionary[key][col]
def correct_cig(feat):
    return func.when(func.col(feat)!=99,func.col(feat)).otherwise(0) 
#这里的用法注意的 func.when(func.col(feat)!=99.func.col(feat))
rec_integer = func.udf(recode,typ.IntegerType()) #调用 func.udf

In [46]:
print(births_trimmed.select('CIG_BEFORE','CIG_1_TRI').show(1)) 
# 利用head 来提取数据就好了，那么如何提取前面几个数字呢
# HEAD ： DATA.HEAD（）  或者 data.show(3),再或者如上所示。
births_transformed=births_trimmed.withColumn('CIG_BEFORE',correct_cig('CIG_BEFORE'))\
.withColumn('CIG_1_TRI',correct_cig('CIG_1_TRI'))\
.withColumn('CIG_2_TRI',correct_cig('CIG_2_TRI'))\
.withColumn('CIG_3_TRI',correct_cig('CIG_3_TRI'))
print(births_transformed.select("CIG_1_TRI","CIG_2_TRI").show(1))

+----------+---------+
|CIG_BEFORE|CIG_1_TRI|
+----------+---------+
|        99|       99|
+----------+---------+
only showing top 1 row

None
+---------+---------+
|CIG_1_TRI|CIG_2_TRI|
+---------+---------+
|        0|        0|
+---------+---------+
only showing top 1 row

None


In [47]:
print(births_trimmed.schema) #应该是对列名的描shu
cols=[(col.name,col.dataType) for col in births_trimmed.schema]
print(cols[0]) #提取对应的列名 及 其属性。
print(cols[1])

StructType(List(StructField(INFANT_ALIVE_AT_REPORT,StringType,true),StructField(BIRTH_YEAR,StringType,true),StructField(BIRTH_PLACE,StringType,true),StructField(MOTHER_AGE_YEARS,StringType,true),StructField(FATHER_COMBINED_AGE,StringType,true),StructField(CIG_BEFORE,StringType,true),StructField(CIG_1_TRI,StringType,true),StructField(CIG_2_TRI,StringType,true),StructField(CIG_3_TRI,StringType,true),StructField(MOTHER_HEIGHT_IN,StringType,true),StructField(MOTHER_PRE_WEIGHT,StringType,true),StructField(MOTHER_DELIVERY_WEIGHT,StringType,true),StructField(MOTHER_WEIGHT_GAIN,StringType,true),StructField(DIABETES_PRE,StringType,true),StructField(DIABETES_GEST,StringType,true),StructField(HYP_TENS_PRE,StringType,true),StructField(HYP_TENS_GEST,StringType,true),StructField(PREV_BIRTH_PRETERM,StringType,true)))
('INFANT_ALIVE_AT_REPORT', StringType)
('BIRTH_YEAR', StringType)


In [48]:
YUN_cols=[]
for i,s in enumerate(cols):
    if s[1]==typ.StringType():
        #print(s[0],"****")
        dis =births.select(s[0]).distinct().rdd.map(lambda row:row[0]).collect() #这里是什么意思？
        #print(dis,"____")
    if 'Y' in dis:
        YUN_cols.append(s[0])
print(YUN_cols)

['INFANT_ALIVE_AT_REPORT', 'DIABETES_PRE', 'DIABETES_GEST', 'HYP_TENS_PRE', 'HYP_TENS_GEST', 'PREV_BIRTH_PRETERM']


In [52]:
births.select(cols[1][0]).distinct().rdd.map(lambda row:row[0]).collect() #用来查看每个一个zhi

['2014', '2015']

In [53]:
births_trimmed.select('BIRTH_YEAR').take(5)

[Row(BIRTH_YEAR='2015'),
 Row(BIRTH_YEAR='2015'),
 Row(BIRTH_YEAR='2015'),
 Row(BIRTH_YEAR='2015'),
 Row(BIRTH_YEAR='2015')]

In [51]:
birthss =spark.read.csv("file:///home/fordl/births_train.csv.gz",header=True) #注意这个梗
birthss.select(['INFANT_NICU_ADMISSION',rec_integer('INFANT_NICU_ADMISSION',func.lit('YUN')).alias('INFANT_NICU_ADMISSION_RECODE')]).take(5)

[Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='Y', INFANT_NICU_ADMISSION_RECODE=1),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='N', INFANT_NICU_ADMISSION_RECODE=0),
 Row(INFANT_NICU_ADMISSION='U', INFANT_NICU_ADMISSION_RECODE=0)]

In [54]:
exprs_YNU=[rec_integer(x,func.lit('YUN')).alias(x)
          if x in YUN_cols
          else x
          for x in births_transformed.columns]
births_transformed = births_transformed.select(exprs_YNU)
births_transformed.select(YUN_cols[-5:]).take(3)

[Row(DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0),
 Row(DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0),
 Row(DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0)]

In [55]:
import pyspark.mllib.stat as st
import numpy as np
numeric_cols=['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE','CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
             'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT','MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN']
numeric_rdd = births_transformed.select(numeric_cols).rdd.map(lambda row : [e for e in row])
mllib_stats = st.Statistics.colStats(numeric_rdd)
for col,m,v in zip(numeric_cols,mllib_stats.mean(),mllib_stats.variance()):
    print('{0}:\t{1:.2f}\t{2:.2f}'.format(col,m,np.sqrt(v)))

MOTHER_AGE_YEARS:	28.30	6.08
FATHER_COMBINED_AGE:	44.55	27.55
CIG_BEFORE:	1.43	5.18
CIG_1_TRI:	0.91	3.83
CIG_2_TRI:	0.70	3.31
CIG_3_TRI:	0.58	3.11
MOTHER_HEIGHT_IN:	65.12	6.45
MOTHER_PRE_WEIGHT:	214.50	210.21
MOTHER_DELIVERY_WEIGHT:	223.63	180.01
MOTHER_WEIGHT_GAIN:	30.74	26.23


In [56]:
categorical_cols = [e for e in births_transformed.columns if e not in numeric_cols]
categorical_rdd = births_transformed.select(categorical_cols).rdd.map(lambda row: [e for e in row])
for i ,col in enumerate(categorical_cols):
    agg =categorical_rdd.groupBy(lambda row: row[i]).map(lambda row:(row[0],len(row[1])))
    print(col,sorted(agg.collect(),key=lambda el:el[1],reverse=True))

INFANT_ALIVE_AT_REPORT [(1, 23349), (0, 22080)]
BIRTH_YEAR [('2014', 22842), ('2015', 22587)]
BIRTH_PLACE [('1', 44558), ('4', 327), ('3', 224), ('2', 136), ('7', 91), ('5', 74), ('6', 11), ('9', 8)]
DIABETES_PRE [(0, 44881), (1, 548)]
DIABETES_GEST [(0, 43451), (1, 1978)]
HYP_TENS_PRE [(0, 44348), (1, 1081)]
HYP_TENS_GEST [(0, 43302), (1, 2127)]
PREV_BIRTH_PRETERM [(0, 43088), (1, 2341)]


In [57]:
corrs = st.Statistics.corr(numeric_rdd)
for i,el in enumerate(corrs>0.5):
    correlated = [
        (numeric_cols[j],corrs[i][j])
        for j ,e in enumerate(el)
        if e==1.0 and j!=i]
    if len(correlated)>0:
        for e in correlated:
            print('{0}-to-{1}:{2:.2f}'.format(numeric_cols[i],e[0],e[1]))

CIG_BEFORE-to-CIG_1_TRI:0.83
CIG_BEFORE-to-CIG_2_TRI:0.72
CIG_BEFORE-to-CIG_3_TRI:0.62
CIG_1_TRI-to-CIG_BEFORE:0.83
CIG_1_TRI-to-CIG_2_TRI:0.87
CIG_1_TRI-to-CIG_3_TRI:0.76
CIG_2_TRI-to-CIG_BEFORE:0.72
CIG_2_TRI-to-CIG_1_TRI:0.87
CIG_2_TRI-to-CIG_3_TRI:0.89
CIG_3_TRI-to-CIG_BEFORE:0.62
CIG_3_TRI-to-CIG_1_TRI:0.76
CIG_3_TRI-to-CIG_2_TRI:0.89
MOTHER_PRE_WEIGHT-to-MOTHER_DELIVERY_WEIGHT:0.54
MOTHER_PRE_WEIGHT-to-MOTHER_WEIGHT_GAIN:0.65
MOTHER_DELIVERY_WEIGHT-to-MOTHER_PRE_WEIGHT:0.54
MOTHER_DELIVERY_WEIGHT-to-MOTHER_WEIGHT_GAIN:0.60
MOTHER_WEIGHT_GAIN-to-MOTHER_PRE_WEIGHT:0.65
MOTHER_WEIGHT_GAIN-to-MOTHER_DELIVERY_WEIGHT:0.60


In [58]:
features_to_keep=['INFANT_ALIVE_AT_REPORT',
                 'BIRTH_PLACE',
                 'MOTHER_AGE_YEARS',
                 'FATHER_COMBINED_AGE',
                 'CIG_1_TRI',
                 'MOTHER_HEIGHT_IN',
                 'MOTHER_PRE_WEIGHT',
                 'DIABETES_PRE',
                 'DIABETES_GEST',
                 'HYP_TENS_GEST',
                 'HYP_TENS_PRE',
                 'PREV_BIRTH_PRETERM']
births_transformed=births_transformed.select([e for e in features_to_keep])

In [59]:
import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:
    agg = births_transformed.groupBy('INFANT_ALIVE_AT_REPORT').pivot(cat).count()
    print(agg.show())
    agg_rdd =agg.rdd.map(lambda row:(row[1:])).flatMap(lambda row: [0 if e ==None else e for e in row]).collect()
    print(agg_rdd)
    row_length = len(agg.collect()[0])-1
    agg =ln.Matrices.dense(row_length,2,agg_rdd)
    test = st.Statistics.chiSqTest(agg)
    print(cat,round(test.pValue,4))

AnalysisException: "cannot resolve '`BIRTH_YEAR`' given input columns: [MOTHER_PRE_WEIGHT, MOTHER_AGE_YEARS, HYP_TENS_GEST, MOTHER_HEIGHT_IN, DIABETES_GEST, CIG_1_TRI, BIRTH_PLACE, INFANT_ALIVE_AT_REPORT, PREV_BIRTH_PRETERM, FATHER_COMBINED_AGE, HYP_TENS_PRE, DIABETES_PRE];;\n'Project ['BIRTH_YEAR]\n+- Project [INFANT_ALIVE_AT_REPORT#1331, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_1_TRI#1046, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, DIABETES_PRE#1332, DIABETES_GEST#1333, HYP_TENS_GEST#1335, HYP_TENS_PRE#1334, PREV_BIRTH_PRETERM#1336]\n   +- Project [recode(INFANT_ALIVE_AT_REPORT#864, YUN) AS INFANT_ALIVE_AT_REPORT#1331, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_BEFORE#1026, CIG_1_TRI#1046, CIG_2_TRI#1066, CIG_3_TRI#1086, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, recode(DIABETES_PRE#883, YUN) AS DIABETES_PRE#1332, recode(DIABETES_GEST#884, YUN) AS DIABETES_GEST#1333, recode(HYP_TENS_PRE#885, YUN) AS HYP_TENS_PRE#1334, recode(HYP_TENS_GEST#886, YUN) AS HYP_TENS_GEST#1335, recode(PREV_BIRTH_PRETERM#887, YUN) AS PREV_BIRTH_PRETERM#1336]\n      +- Project [INFANT_ALIVE_AT_REPORT#864, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_BEFORE#1026, CIG_1_TRI#1046, CIG_2_TRI#1066, CASE WHEN NOT (cast(CIG_3_TRI#877 as int) = 99) THEN CIG_3_TRI#877 ELSE cast(0 as string) END AS CIG_3_TRI#1086, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, DIABETES_PRE#883, DIABETES_GEST#884, HYP_TENS_PRE#885, HYP_TENS_GEST#886, PREV_BIRTH_PRETERM#887]\n         +- Project [INFANT_ALIVE_AT_REPORT#864, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_BEFORE#1026, CIG_1_TRI#1046, CASE WHEN NOT (cast(CIG_2_TRI#876 as int) = 99) THEN CIG_2_TRI#876 ELSE cast(0 as string) END AS CIG_2_TRI#1066, CIG_3_TRI#877, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, DIABETES_PRE#883, DIABETES_GEST#884, HYP_TENS_PRE#885, HYP_TENS_GEST#886, PREV_BIRTH_PRETERM#887]\n            +- Project [INFANT_ALIVE_AT_REPORT#864, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_BEFORE#1026, CASE WHEN NOT (cast(CIG_1_TRI#875 as int) = 99) THEN CIG_1_TRI#875 ELSE cast(0 as string) END AS CIG_1_TRI#1046, CIG_2_TRI#876, CIG_3_TRI#877, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, DIABETES_PRE#883, DIABETES_GEST#884, HYP_TENS_PRE#885, HYP_TENS_GEST#886, PREV_BIRTH_PRETERM#887]\n               +- Project [INFANT_ALIVE_AT_REPORT#864, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CASE WHEN NOT (cast(CIG_BEFORE#874 as int) = 99) THEN CIG_BEFORE#874 ELSE cast(0 as string) END AS CIG_BEFORE#1026, CIG_1_TRI#875, CIG_2_TRI#876, CIG_3_TRI#877, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, DIABETES_PRE#883, DIABETES_GEST#884, HYP_TENS_PRE#885, HYP_TENS_GEST#886, PREV_BIRTH_PRETERM#887]\n                  +- Project [INFANT_ALIVE_AT_REPORT#864, BIRTH_YEAR#865, BIRTH_PLACE#867, MOTHER_AGE_YEARS#868, FATHER_COMBINED_AGE#871, CIG_BEFORE#874, CIG_1_TRI#875, CIG_2_TRI#876, CIG_3_TRI#877, MOTHER_HEIGHT_IN#878, MOTHER_PRE_WEIGHT#880, MOTHER_DELIVERY_WEIGHT#881, MOTHER_WEIGHT_GAIN#882, DIABETES_PRE#883, DIABETES_GEST#884, HYP_TENS_PRE#885, HYP_TENS_GEST#886, PREV_BIRTH_PRETERM#887]\n                     +- Relation[INFANT_ALIVE_AT_REPORT#864,BIRTH_YEAR#865,BIRTH_MONTH#866,BIRTH_PLACE#867,MOTHER_AGE_YEARS#868,MOTHER_RACE_6CODE#869,MOTHER_EDUCATION#870,FATHER_COMBINED_AGE#871,FATHER_EDUCATION#872,MONTH_PRECARE_RECODE#873,CIG_BEFORE#874,CIG_1_TRI#875,CIG_2_TRI#876,CIG_3_TRI#877,MOTHER_HEIGHT_IN#878,MOTHER_BMI_RECODE#879,MOTHER_PRE_WEIGHT#880,MOTHER_DELIVERY_WEIGHT#881,MOTHER_WEIGHT_GAIN#882,DIABETES_PRE#883,DIABETES_GEST#884,HYP_TENS_PRE#885,HYP_TENS_GEST#886,PREV_BIRTH_PRETERM#887,... 30 more fields] csv\n"

In [139]:
print(ln.Matrices.dense(3,2,[1,2,3,4,5,6]))

DenseMatrix([[ 1.,  4.],
             [ 2.,  5.],
             [ 3.,  6.]])


In [60]:
import pyspark.mllib.feature as  tf
import pyspark.mllib.regression as reg
import pyspark.mllib.linalg as ln
hashing =tf.HashingTF(7)

In [64]:
births_hashed = births_transformed.rdd.map(lambda row:[list(hashing.transform(row[1]).toArray()) if col =='BIRTH_PLACE' else row[i] for i,col in enumerate(features_to_keep)]).map(lambda row :[[e] if type(e)== int else e for e in row]).map(lambda row :[item for sublist in row for item in sublist]).map(lambda row :reg.LabeledPoint(row[0],ln.Vectors.dense(row[1:])))

In [65]:
births_train,births_test=births_hashed.randomSplit([0.6,0.4])


In [68]:
print(births_train.count())
#print(len(births_train.first()))

27268


In [63]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS
lr_model = LogisticRegressionWithLBFGS.train(births_train,iterations=10)

Py4JJavaError: An error occurred while calling o1795.trainLogisticRegressionModelWithLBFGS.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 121.0 failed 1 times, most recent failure: Lost task 0.0 in stage 121.0 (TID 7086, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 22 but got 23.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:87)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:509)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:508)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2119)
	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1026)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1008)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1128)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:517)
	at org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS.runWithMlLogisticRegression$1(LogisticRegression.scala:453)
	at org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS.run(LogisticRegression.scala:459)
	at org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS.run(LogisticRegression.scala:425)
	at org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS.run(LogisticRegression.scala:355)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainRegressionModel(PythonMLLibAPI.scala:92)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainLogisticRegressionModelWithLBFGS(PythonMLLibAPI.scala:308)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: requirement failed: Dimensions mismatch when adding new sample. Expecting 22 but got 23.
	at scala.Predef$.require(Predef.scala:224)
	at org.apache.spark.mllib.stat.MultivariateOnlineSummarizer.add(MultivariateOnlineSummarizer.scala:87)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:509)
	at org.apache.spark.ml.classification.LogisticRegression$$anonfun$15.apply(LogisticRegression.scala:508)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1136)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$25.apply(RDD.scala:1137)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:797)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [69]:
selector = tf.ChiSqSelector(4).fit(births_train)

Py4JJavaError: An error occurred while calling o1878.fitChiSqSelector.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 126.0 failed 1 times, most recent failure: Lost task 0.0 in stage 126.0 (TID 7091, localhost, executor driver): java.lang.IndexOutOfBoundsException: 21 not in [-21,21)
	at breeze.linalg.DenseVector$mcD$sp.apply$mcD$sp(DenseVector.scala:72)
	at breeze.linalg.DenseVector$mcD$sp.apply(DenseVector.scala:71)
	at breeze.linalg.DenseVector$mcD$sp.apply(DenseVector.scala:53)
	at breeze.linalg.TensorLike$class.apply$mcID$sp(Tensor.scala:107)
	at breeze.linalg.DenseVector.apply$mcID$sp(DenseVector.scala:53)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1$$anonfun$apply$3.apply(ChiSqTest.scala:119)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1$$anonfun$apply$3.apply(ChiSqTest.scala:118)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1.apply(ChiSqTest.scala:118)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1.apply(ChiSqTest.scala:102)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:373)
	at org.apache.spark.rdd.PairRDDFunctions$$anonfun$countByKey$1.apply(PairRDDFunctions.scala:373)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.PairRDDFunctions.countByKey(PairRDDFunctions.scala:372)
	at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1204)
	at org.apache.spark.rdd.RDD$$anonfun$countByValue$1.apply(RDD.scala:1204)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
	at org.apache.spark.rdd.RDD.countByValue(RDD.scala:1203)
	at org.apache.spark.mllib.stat.test.ChiSqTest$.chiSquaredFeatures(ChiSqTest.scala:124)
	at org.apache.spark.mllib.stat.Statistics$.chiSqTest(Statistics.scala:176)
	at org.apache.spark.mllib.feature.ChiSqSelector.fit(ChiSqSelector.scala:257)
	at org.apache.spark.mllib.api.python.PythonMLLibAPI.fitChiSqSelector(PythonMLLibAPI.scala:652)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IndexOutOfBoundsException: 21 not in [-21,21)
	at breeze.linalg.DenseVector$mcD$sp.apply$mcD$sp(DenseVector.scala:72)
	at breeze.linalg.DenseVector$mcD$sp.apply(DenseVector.scala:71)
	at breeze.linalg.DenseVector$mcD$sp.apply(DenseVector.scala:53)
	at breeze.linalg.TensorLike$class.apply$mcID$sp(Tensor.scala:107)
	at breeze.linalg.DenseVector.apply$mcID$sp(DenseVector.scala:53)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1$$anonfun$apply$3.apply(ChiSqTest.scala:119)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1$$anonfun$apply$3.apply(ChiSqTest.scala:118)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
	at scala.collection.immutable.Range.foreach(Range.scala:160)
	at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
	at scala.collection.AbstractTraversable.map(Traversable.scala:104)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1.apply(ChiSqTest.scala:118)
	at org.apache.spark.mllib.stat.test.ChiSqTest$$anonfun$3$$anonfun$apply$1.apply(ChiSqTest.scala:102)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:191)
	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.scheduler.Task.run(Task.scala:108)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [114]:
# wget http://www.tomdrabas.com/data/LearningPySpark/births_transformed.csv.gz

labelx = [('INFANT_ALIVE_AT_REPORT',typ.IntegerType()),
        ('BIRTH_PLACE',typ.StringType()),
        ('MOTHER_AGE_YEARS',typ.IntegerType()),
        ('FATHER_COMBINED_AGE',typ.IntegerType()),
        ('CIG_BEFORE',typ.IntegerType()),
        ('CIG_1_TRI',typ.IntegerType()),
        ('CIG_2_TRI',typ.IntegerType()),
        ('CIG_3_TRI',typ.IntegerType()),
         ('MOTHER_HEIGHT_IN',typ.IntegerType()),
         ('MOTHER_PRE_WEIGHT',typ.IntegerType()),
         ('MOTHER_DELIVERY_WEIGHT',typ.IntegerType()),
         ('MOTHER_WEIGHT_GAIN',typ.IntegerType()),
         ('DIABETES_PRE',typ.IntegerType()),
         ('DIABETES_GEST',typ.IntegerType()),
         ('HYP_TENS_PRE',typ.IntegerType()),
         ('HYP_TENS_GEST',typ.IntegerType()),
         ('PREV_BIRTH_PRETERM',typ.IntegerType())]
shema = typ.StructType([typ.StructField(e[0],e[1],False) for e in labelx])
data = spark.read.csv('file:///home/fordl/births_transformed.csv.gz',header=True,schema=shema)

In [115]:
import pyspark.ml.feature as tf

In [116]:
data = data.withColumn("BIRTH_PLACE_INT",data['BIRTH_PLACE'].cast(typ.IntegerType()))

In [117]:
data.select("BIRTH_PLACE").show(4)
type(data.select('BIRTH_PLACE'))
data.select("INFANT_ALIVE_AT_REPORT").show(5)

+-----------+
|BIRTH_PLACE|
+-----------+
|          1|
|          1|
|          1|
|          1|
+-----------+
only showing top 4 rows

+----------------------+
|INFANT_ALIVE_AT_REPORT|
+----------------------+
|                     0|
|                     0|
|                     0|
|                     0|
|                     0|
+----------------------+
only showing top 5 rows



In [126]:
encoder = tf.OneHotEncoder(inputCol = 'BIRTH_PLACE_INT',outputCol='BIRHT_PLACE_VEC')

In [127]:
featureCreator = tf.VectorAssembler(inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()],outputCol='features')

In [128]:
import pyspark.ml.classification as cl

In [129]:
logistic = cl.LogisticRegression(maxIter=10,regParam=0,labelCol="INFANT_ALIVE_AT_REPORT")

In [130]:
from pyspark.ml import Pipeline

In [131]:
pipeline = Pipeline(stages=[encoder,featureCreator,logistic])

In [132]:
d_train,d_test = data.randomSplit([0.7,0.3],seed=666)

In [133]:
model = pipeline.fit(d_train)
test_model = model.transform(d_test)

In [134]:
test_model.first()

Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRHT_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.1516, -1.1516]), probability=DenseVector([0.7598, 0.2402]), prediction=0.0)

In [135]:
import pyspark.ml.evaluation as ev

In [136]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')

In [137]:
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(test_model,{evaluator.metricName:'areaUnderPR'}))

0.741891349641666
0.7151539716088794


In [144]:
# 模型保存
pipelinepath = './ml_lr_pipeline'
pipeline.write().overwrite().save(pipelinepath)
loadedpipeline = Pipeline.load(pipelinepath)
loadedpipeline.fit(d_train).transform(d_test).take(2)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRHT_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.1516, -1.1516]), probability=DenseVector([0.7598, 0.2402]), prediction=0.0),
 Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=93, MOTHER_DELIVERY_WEIGHT=100, MOTHER_WEIGHT_GAIN=0, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRHT_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(

In [145]:
from pyspark.ml import PipelineModel
modelpath = './pipemodel'
model.write().overwrite().save(modelpath)
loadedPipelineModel = PipelineModel.load(modelpath)
test_reloadmodel = loadedPipelineModel.transform(d_test)
print(test_reloadmodel.take(3))


[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRHT_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.1516, -1.1516]), probability=DenseVector([0.7598, 0.2402]), prediction=0.0), Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=14, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=63, MOTHER_PRE_WEIGHT=93, MOTHER_DELIVERY_WEIGHT=100, MOTHER_WEIGHT_GAIN=0, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRHT_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(2

In [146]:
import pyspark.ml.tuning as tune

In [148]:
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder().addGrid(logistic.maxIter,[2,10,50]).addGrid(logistic.regParam,[0.01,0.05,0.3]).build()

In [149]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability',labelCol='INFANT_ALIVE_AT_REPORT')

In [152]:
cv = tune.CrossValidator(estimator = logistic,estimatorParamMaps =grid,evaluator=evaluator)
pipeline =Pipeline(stages=[encoder,featureCreator])
data_transformer = pipeline.fit(d_train)
cv_model = cv.fit(data_transformer.transform(d_train))

In [157]:
data_train = data_transformer.transform(d_test)
results = cv_model.transform(data_train)
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))

0.7404526641072416
0.7157767684747429


In [163]:
results =[(
    [ {key.name:paramValue}
    for key,paramValue
    in zip (params.keys(),
           params.values())],metric)
    for params,metric in zip(cv_model.getEstimatorParamMaps(),
                            cv_model.avgMetrics)]

sorted(results,key=lambda el:el[1], reverse=True)[0]

([{'maxIter': 50}, {'regParam': 0.01}], 0.738652833807851)

In [166]:
selector =tf.ChiSqSelector(numTopFeatures=5,featuresCol=featureCreator.getOutputCol(),
                          outputCol='selectedFeatures',labelCol='INFANT_ALIVE_AT_REPORT')


In [167]:
logistic = cl.LogisticRegression(labelCol='INFANT_ALIVE_AT_REPORT',featuresCol='selectedFeatures')
pipeline = Pipeline(stages=[encoder,featureCreator,selector])
data_transformer= pipeline.fit(d_train)
tvs = tune.TrainValidationSplit(estimator=logistic,estimatorParamMaps=grid,evaluator=evaluator)


In [168]:
tvsmodel=tvs.fit(data_transformer.transform(d_train))
data_train = data_transformer.transform(d_test)
results = tvsmodel.transform(data_train)
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderROC'}))
print(evaluator.evaluate(results,{evaluator.metricName:'areaUnderPR'}))

0.7294296314442145
0.7037759446410553


In [39]:
import tensorflow as tf

  return f(*args, **kwds)
