In [17]:
import apache_beam as beam
import pandas as pd
import time
import datetime
import random
import numpy as np

In [120]:
#### 使用pandas 生产数据，date_time 为时间戳，time_int是该时间所对应的int值，为了方便之后分组处理，然后是温度，范围为0-100 随机分布，写进csv文件

df=pd.DataFrame()
date_time=[]
temp=[]
time_int=[]
for i in range(3600):   ### 假设从 时间戳=0 那一秒开始，每秒产生一个数据产生 3600秒 
    date = datetime.datetime.fromtimestamp(i)
    date_time_str = date.strftime("%Y-%m-%d %H:%M:%S")
    tem=round(random.uniform(0.0,100.0),2)  ### 随机分布 保留两位小数
    
    date_time.append(date)
    temp.append(tem)
    time_int.append(int(time.mktime(date.timetuple())))   ####将读取到的时间戳文件转换成int 方便之后分组
df["date_time"]=date_time
df["temperature"]=temp
df["time_int"]=time_int
df.to_csv("temperature.csv",index=False)


###我们也可以用一些方法来生产实时数据，然后用beam stream来随时读取，这个也是之后可以改进的点

In [86]:
### 整合数据函数  使用模板

class MinMaxMeanFn(beam.CombineFn):  

    def create_accumulator(self):
        # sum, min, max, count
        return (0.0, 999999999.0, 0.0, 0)

    def add_input(self, cur_data, input):  ##按照时间分组好的数据作为input
        (cur_sum, cur_min, cur_max, count) = cur_data
        cur_count = len(input)
        
        
        sum_input=cur_sum
        min_input=cur_min
        max_input=cur_max
        for record in input:     ### 计算每组数据的min，max，avg
            sum_input+=float(record[1])
            min_input = min(min_input,float(record[1]))
            max_input = max(max_input,float(record[1]))
        return sum_input , min_input, max_input, cur_count
    
    def merge_accumulators(self, accumulators):
        sums, mins, maxs, counts = zip(*accumulators)
        return sum(sums), min(mins), max(maxs), sum(counts)

    def extract_output(self, cur_data):
        (sum, min, max, count) = cur_data
        avg = sum / count if count else float('NaN')
        return  {
            "max": max,
            "min": min,
            "avg": "%.2f"%avg,
            "count": count
        }

In [121]:
###生产 每一分钟的结果

#from external resources

start_time=0
end_time=3600


pipe1 = beam.Pipeline()

temperature_per_1min = (pipe1
           | "Read from Text" >> beam.io.ReadFromText("temperature.csv", skip_header_lines=1) ## 读文件，跳过文件头
           | "split the record" >> beam.Map(lambda record: record.split(','))   ### 把文件分成每一个record
           | 'Filter regular' >> beam.GroupBy(lambda x:int((int(x[2]))/60))  ##### 按照时间将记录分组，除60取结果，数组结果相同说明再同一分钟
           |'get min max and mean' >> beam.CombinePerKey(MinMaxMeanFn())  #### 用函数处理分组的结果
           |'Format the output' >> beam.Map(lambda x: '{},{},{},{}'.format(x[0],x[1]["max"],x[1]["min"],x[1]["avg"]))  ### 格式化上一步生成的tuple，然后写进文件
           | 'Write to text'>> beam.io.WriteToText("every_1min_info",append_trailing_newlines=True,file_name_suffix=".csv",header="time_range,max_temperature,min_temperature,avg_temperature")
)  
                            
pipe1.run()



<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x160035da310>

In [122]:
###生产 每五分钟的结果   道理同上

pipe2 = beam.Pipeline()

temperature_per_1min = (pipe2
           | "Read from Text" >> beam.io.ReadFromText("temperature.csv", skip_header_lines=1)
           | "split the record" >> beam.Map(lambda record: record.split(','))
           | 'Filter regular' >> beam.GroupBy(lambda x:int((int(x[2]))/300))
           |'get min max and mean' >> beam.CombinePerKey(MinMaxMeanFn())
           |'Format the output' >> beam.Map(lambda x: '{},{},{},{}'.format(x[0],x[1]["max"],x[1]["min"],x[1]["avg"]))
           | 'Write to text'>> beam.io.WriteToText("every_5min_info",append_trailing_newlines=True,file_name_suffix=".csv",header="time_range,max_temperature,min_temperature,avg_temperature")
)  
                            
pipe2.run()


<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1607df59490>

In [123]:
###生产 每十五分钟的结果   道理同上
pipe3 = beam.Pipeline()

temperature_per_1min = (pipe3
           | "Read from Text" >> beam.io.ReadFromText("temperature.csv", skip_header_lines=1)
           | "split the record" >> beam.Map(lambda record: record.split(','))
           | 'Filter regular' >> beam.GroupBy(lambda x:int((int(x[2]))/900))
           |'get min max and mean' >> beam.CombinePerKey(MinMaxMeanFn())
           |'Format the output' >> beam.Map(lambda x: '{},{},{},{}'.format(x[0],x[1]["max"],x[1]["min"],x[1]["avg"]))
           | 'Write to text'>> beam.io.WriteToText("every_15min_info",append_trailing_newlines=True,file_name_suffix=".csv",header="time_range,max_temperature,min_temperature,avg_temperature")
)  
                            
pipe3.run()

<apache_beam.runners.portability.fn_api_runner.fn_runner.RunnerResult at 0x1607c811dc0>

In [None]:
####假设在输入集合中存在一些异常点（超过随机温度范围），如何不使用条件规则（例如直接if判断）来找到这些异常点
####因为这个数据是随机分布的，所以最好的办法就是用比较大小的办法来挑出异常点
####基于统计学方法，我们还可以用一下几种方法检测异常点
####1.数据超过标准差的三倍，可能是异常点
####2.我们还可以将数据四分位，然后 小于 q1-1.5*（q3-q1） 和大于 q3+1.5*（q3-q1） 为异常点
####3.一些机器学习的方法，比如Random Cut Forest
