# Create  Spark Context 

In [2]:
import pyspark
from pyspark.sql import SQLContext

In [5]:
try : 
    sc = pyspark.SparkContext('local[*]')
except Exception as e : 
    print( "sc already exists ")
sqlContext = SQLContext(sc)
print("done")

done


# Let's see SC works fine 

In [4]:
# do something to prove it works
rdd = sc.parallelize(range(100))
rdd.takeSample(False, 5)
print(rdd.collect())
sc.stop()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]


# Collect Data from Web REST API 
- url : https://openweathermap.org/forecast5 <br>
※ Let's check out how the data looks like 

온도, 파도, 기압, 습도 등등 을 가지고 결국 기상청에서 예보한 기상 정보를 카테고리를 예측하는 모델을 만들기 위해서 기존에 조금 복잡한 구조의 Json 데이터에서 훈련이 용이하도록 편집한다. 

In [7]:
import requests
from sklearn.cross_validation import train_test_split

resp = requests.get('http://openweathermap.org/data/2.5/forecast?q=London,us&mode=json&appid=b1b15e88fa797225412429c1c50c122a1')
data = resp.json()

x_data = []
y_data = []
data_list = data['list']
for raw in data_list :
    x_data.append(raw['main'])
    y_data.append(raw['weather'][0]['main'])

print("x_data : {0}".format(x_data[0:2]))
print("y_data : {0}".format(y_data[0:2]))

x_data : [{'temp_kf': 3.43, 'pressure': 1010.24, 'temp_min': 25.59, 'grnd_level': 1010.24, 'temp_max': 29.02, 'temp': 29.02, 'sea_level': 1026.07, 'humidity': 49}, {'temp_kf': 2.57, 'pressure': 1008.94, 'temp_min': 25.81, 'grnd_level': 1008.94, 'temp_max': 28.39, 'temp': 28.39, 'sea_level': 1024.73, 'humidity': 50}]
y_data : ['Clear', 'Clear']


# Train /Test 데이터 분리 
정확한 테스트를 위해 Train 데이터와 Test 데이터를 분리한다 .

In [8]:
data_train, data_test, labels_train, labels_test = train_test_split(x_data, y_data, test_size=0.20, random_state=42)
def change (x) :
    if(x == 'Clear') :
        return 0
    if(x in ['Rain', 'Clouds']) :
        return 1
#     idx = ['Clear', 'Rain', 'Clouds']
#     return idx.index(x)

labels_train = list(map(lambda x : change(x), labels_train ))
labels_test = list(map(lambda x : change(x), labels_test ))

print("data_train : {0}".format(data_train[0:1]))
print("data_test : {0}".format(data_test[0:1]))
print("labels_train : {0}".format(labels_train[0:1]))
print("labels_test : {0}".format(labels_test[0:1]))

data_train : [{'temp_kf': 0, 'pressure': 1009.64, 'temp_min': 21.22, 'grnd_level': 1009.64, 'temp_max': 21.22, 'temp': 21.22, 'sea_level': 1025.61, 'humidity': 91}]
data_test : [{'temp_kf': 0, 'pressure': 1012.97, 'temp_min': 17.27, 'grnd_level': 1012.97, 'temp_max': 17.27, 'temp': 17.27, 'sea_level': 1028.85, 'humidity': 85}]
labels_train : [1]
labels_test : [1]


# Save Data as file

In [10]:
# Train 데이터 저장 
with open('/home/jovyan/work/train6.data', 'w') as fp : 
    for data, label in zip(data_train, labels_train) :
        x_data = []
        x_data.append(label)
        for raw in data.keys() :
            x_data.append(data[raw])
        x_data.append('\n')
        #train_feed_data.append(LabeledPoint(label, x_data))
        fp.write(' '.join(map(lambda x: str(x) ,x_data)))
print("Train Data Save Done")
        
# Test 데이터 저장 
with open('/home/jovyan/work/Test6.data', 'w') as fp : 
    for data, label in zip(data_test, labels_test) :
        x_data = []
        x_data.append(label)
        for raw in data.keys() :
            x_data.append(data[raw])
        x_data.append('\n')
        #train_feed_data.append(LabeledPoint(label, x_data))
        fp.write(' '.join(map(lambda x: str(x) ,x_data)))
print("Test Data Save Done")

Train Data Save Done
Test Data Save Done


# Train Model

In [14]:
from pyspark.mllib.classification import LogisticRegressionWithLBFGS, LogisticRegressionModel
from pyspark.mllib.regression import LabeledPoint

# 파일이 있는지 확인 
# with open('/home/jovyan/work/train5.data', 'r') as fp : 
#     print(fp.read())
    
train_data = sc.textFile("file:///home/jovyan/work/train6.data")
print(train_data.collect()[1:5])
test_data = sc.textFile("file:///home/jovyan/work/Test6.data")
print(test_data.collect()[1:5])

['1 0 1015.83 21.05 1015.83 21.05 21.05 1031.67 69 ', '1 0 1010.69 16.47 1010.69 16.47 16.47 1026.5 100 ', '0 0 1012.24 22.12 1012.24 22.12 22.12 1028.2 76 ', '0 3.43 1010.24 25.59 1010.24 29.02 29.02 1026.07 49 ']
['1 0 1013.7 13.51 1013.7 13.51 13.51 1029.73 100 ', '0 0 1013.86 20.58 1013.86 20.58 20.58 1029.76 66 ', '1 0 1015.26 18.14 1015.26 18.14 18.14 1031.2 73 ', '0 0 1015.77 21.1 1015.77 21.1 21.1 1031.67 71 ']


In [16]:
def parsePoint(line):
    values = [float(x) for x in line.split(' ')[0:len(line.split(' '))-1]] 
    return LabeledPoint(values[0], values[1:])

parsedTrainData = train_data.map(parsePoint)
parsedTestData = test_data.map(parsePoint)
model = LogisticRegressionWithLBFGS.train(parsedTrainData)
print(model)

(weights=[-140.375500081,2.68111811095,4.07863933701,2.68111811095,-1.91193043301,-1.91193043301,-5.29658718641,0.175319762133], intercept=0.0)


# Test Model
Test Model Accuracy  

In [18]:
# print("labels_train : {0}".format(labels_train[0:5]))
# print("labels_test : {0}".format(labels_test[0:5]))
# Evaluating the model on training data
labelsAndPreds = parsedTestData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda x: x[0] != x[1]).count() / float(parsedTestData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.375


# Save Model

In [19]:
import random 
model.save(sc, "/home/jovyan/work/" + str(random.randrange(1,50)) + '/')
print("save model")

save model
