### 数据探查

#### 数据加载和初识

In [7]:
import pandas as pd
import numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

# 读取.parquet文件
df = pd.read_parquet("../interview.parquet")

print("shape："+str(df.shape))
print("columns:"+str(df.columns))  # 获取数据条数和列名

print("前三行数据内容：\n"+str(df.head(3)))  # 获取前三行内容


shape：(704669, 12)
columns:Index(['LocalTime', 'TradingDay', 'InstrumentID', 'ExchangeID', 'LastPrice',
       'Volume', 'Turnover', 'UpperLimitPrice', 'LowerLimitPrice',
       'UpdateTime', 'UpdateMillisec', 'ActionDay'],
      dtype='object')
前三行数据内容：
             LocalTime TradingDay InstrumentID ExchangeID  LastPrice  Volume  \
0  1640089802520605952   20211222        y2212        DCE     8066.0       0   
1  1640089802520635092   20211222        v2212        DCE     8231.0       0   
2  1640089802520668444   20211222       ag2212       SHFE     4868.0       0   

   Turnover  UpperLimitPrice  LowerLimitPrice UpdateTime  UpdateMillisec  \
0       0.0           8564.0           7444.0   18:34:55               0   
1       0.0           8806.0           7654.0   18:34:55               0   
2       0.0           5331.0           4362.0   18:35:30             400   

  ActionDay  
0  20211222  
1  20211222  
2  20211221  


##### 统计LocalTime的单调性
检查是否存在LocalTime乱序情况

In [8]:
count = 0
for index in range(1, len(df['LocalTime'])):
    if (df['LocalTime'][index] < df['LocalTime'][index - 1]):
        count += 1
        # print(count)

if(count==0):
    print("LocalTime不存在乱序")
else:
    print("LocalTime存在乱序，乱序次数："+str(count))


LocalTime不存在乱序


##### 分交易所统计(UpdateTime,UpdateMullisec)单调性

In [10]:
from datetime import datetime as dt

def check_update_time(df,exchangeid):
    disorder_cnt = 0
    previous_index = 0
    for index in range(1, len(df['UpdateTime'])):
        if (df['ExchangeID'][index] == exchangeid):
            if previous_index == 0:
                previous_index = index
                continue
            if dt.strptime(df['UpdateTime'][index], "%H:%M:%S") < dt.strptime(df['UpdateTime'][previous_index],
                                                                              "%H:%M:%S") or (
                    dt.strptime(df['UpdateTime'][index], "%H:%M:%S") == dt.strptime(df['UpdateTime'][previous_index],
                                                                                    "%H:%M:%S")
                    and int(df['UpdateMillisec'][index]) < int(df['UpdateMillisec'][previous_index])):
                disorder_cnt += 1
            previous_index = index

    if disorder_cnt == 0:
        print(exchangeid+"不存在乱序")
    else:
        print(exchangeid+"存在乱序，乱序次数：" + str(disorder_cnt))
    return

check_update_time(df,'INE') #INE存在乱序，乱序次数：1
check_update_time(df,'CZCE') #CZCE存在乱序，乱序次数：7025
check_update_time(df,'DCE')  #DCE存在乱序，乱序次数：53399
check_update_time(df,'SHFE') #SHFE存在乱序，乱序次数：1

#
# from pyspark import SparkConf, SparkContext
# from pyspark.sql import SQLContext
#
# sc = SparkContext()
# sqlContext = SQLContext(sc)
# data = sqlContext.read.parquet("../interview.parquet")
# print(data.schema)
# print(data.select("TradingDay").agg({"TradingDay": "min"}).show()) #TradingDay min 20211221 max 20211222
# print(data.select("ActionDay").agg({"ActionDay": "min"}).show()) #ActionDay min 20211221 max 20211222
# print(data.select("*").where("TradingDay<ActionDay").count()) # 293,802 不相等 #TradingDay比ActionDay大1天？
#
# ine = data.select("*").where("ExchangeID='INE' and InstrumentID ='lu2212'")
#
# print("======")
# # data.select("*").where(ExchangeID=INE,)
# print(data.select("*").where("ExchangeID='INE' and InstrumentID ='lu2212'").tail(30)) # 293,802 不相等 #TradingDay比ActionDay大1天？
#



INE存在乱序，乱序次数：1
CZCE存在乱序，乱序次数：7025
DCE存在乱序，乱序次数：53399
SHFE存在乱序，乱序次数：1
