In [1]:
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkConf
import json
from pyspark import SparkContext,SparkFiles
import numpy as np

In [2]:
def airPlaneRouteTrans(data):
    mapSize = data[4]
    blockSize = mapSize/3
    slope = data[6]
    intercept = data[5]
    points = []
    points.append( (int((0-intercept)/slope),0) ) # tuple(x,y)
    points.append( (int(((mapSize/2)-intercept)/slope),mapSize/2) )
    points.append( (int((mapSize-intercept)/slope),mapSize) )
    blockPos = []
    for x, y in points:
        block=0
        if (x >= 0) and (x <= blockSize): #left
            block=1
        elif  x<=blockSize*2 : #middle
            block=2
        else: #right
            block=3
        if (y >= 0) and (y <= blockSize): #up
            block += 0
        elif y<=blockSize*2 : #middle
            block += 3
        else: #down
            block += 6
        blockPos.append(block)
        
    Route = "Don't Know"
    if blockPos[1] == 5:
        Route = "Middle"
    elif (blockPos[0],blockPos[2]) in [(2,8),(8,2),(4,6),(6,4),(1,9),(9,1),(3,7),(7,3)]:
        Route = "Middle"
    elif (blockPos[0],blockPos[2]) in [(1,3),(3,1),(7,9),(9,7),(1,7),(7,1),(3,9),(9,3)]:
        if (blockPos[0],blockPos[2]) in [(1,3),(3,1)]:
            Route = "UpSide"
        elif (blockPos[0],blockPos[2]) in [(7,9),(9,7)]:
            Route = "DownSide"
        elif (blockPos[0],blockPos[2]) in [(1,7),(7,1)]:
            Route = "LeftSide"
        elif (blockPos[0],blockPos[2]) in [(3,9),(9,3)]:
            Route = "RightSide"
    elif (blockPos[0] in [2,8]) or (blockPos[2] in [2,8]):
        if blockPos[1]==4:
            Route = "LeftSide_Middle"
        elif blockPos[1]==6:
            Route = "RightSide_Middle"
        else:
            Route = "Outer"
    return (data[0],data[1],data[2],data[3],data[4],Route,data[7],data[8])

In [3]:
#매치정보 합치기
def matchDataFrame(matchDF, circlePositionDF, airplaneRouteDF):
    
    #매치정보
    matchDF = matchDF.select('id','createdAt','mapName','duration')  \
        .filter((col("isCustomMatch")==False) & (col("matchType")=="competitive"))

    matchInfoDF = matchDF.join(airplaneRouteDF,matchDF.id==airplaneRouteDF.matchId,"inner") \
        .join(circlePositionDF,matchDF.id==circlePositionDF.matchId,"inner") \
        .select(matchDF.id.alias("matchId"),matchDF.createdAt,matchDF.mapName,matchDF.duration \
                , airplaneRouteDF.mapX.alias("mapSideLength"), airplaneRouteDF.intercept.alias("airplaneRoute_intercept") \
                , airplaneRouteDF.slope.alias("airplaneRoute_slope") \
                , circlePositionDF.blue.alias("blueCircle"),circlePositionDF.white.alias("whiteCircle"))
    
    tempRDD = matchInfoDF.rdd.map(airPlaneRouteTrans)
    matchInfoDF = tempRDD.toDF(['matchId','createAt','mapName','duration','mapSize',\
                                'airplaneRoute','blueCircle','whiteCircle'])
    return matchInfoDF

In [4]:
def findPos(data):
    Pos=0
    for value in data:
        if not value == None:
            Pos = value
    return Pos

In [5]:
#총 이동거리로 변환
def DistancePlayer(position):
    import builtins
    round = getattr(builtins, "round")
    abs = getattr(builtins, "abs")
    moveDistance = 0
    liveTime = 0
    if len(position)!=0:
        liveTime = position[-1][0]
    for idx, pos in enumerate(position[1:]):
        x2,y2 = pos[1],pos[2]
        x1,y1 = position[idx][1],position[idx][2]
        moveDistance += ((abs(x2-x1)**2)+(abs(y2-y1)**2))**(1/2) / 102.4
    moveDistance = round(moveDistance,10)
    return moveDistance, liveTime

In [6]:
#자기장과의 거리 값으로 변환
def DistancePlayerAndCircle(blueCircle,whiteCircle,position):  
    WhiteDistance = []
    BlueDistance = []

    weight = 0.7
    cnt=0
    for pos in position:
        Ptime = pos[0]
        timecheck=0
        for point, Ctime in enumerate([tm[0] for tm in whiteCircle]):
            if Ptime < Ctime:
                timecheck = point
                break
                
        posX = pos[1]
        posY = pos[2]
        
        if ((cnt%18 == 0) & (cnt!=0)):
            weight *=0.7
        
        WcircleX = whiteCircle[point][1]
        WcircleY = whiteCircle[point][2]
        WcircleR = whiteCircle[point][4]
        
        import builtins
        abs = getattr(builtins, "abs") #내장함수abs와 spark.abs 덮어씌우기
        round = getattr(builtins, "round")
        
        BcircleX = blueCircle[point][1]
        BcircleY = blueCircle[point][2]
        BcircleR = blueCircle[point][4]
        
        BDistance = (BcircleR - ((abs(BcircleX-posX)**2)+(abs(BcircleY-posY)**2))**(1/2))/BcircleR
        BlueDistance.append( (Ptime, round(BDistance * (1-weight),10)) )
        
        if not WcircleR == 0:
            WDistance = ((abs(WcircleX-posX)**2)+(abs(WcircleY-posY)**2))**(1/2) / BcircleR
            WhiteDistance.append( (Ptime,round(WDistance * (1-weight),10)) ) #T
        cnt+=1
        
    return WhiteDistance, BlueDistance

In [7]:
def summaryDistance(data):
    matchId = data[0]
    mapName = data[1]
    duration = data[2]
    blueCircle = data[3]
    whiteCircle = data[4]
    playerName = data[5]
    ranking = data[6]
    position = data[7]
    moveDistance, liveTime = DistancePlayer(position)
    WhiteDistance, BlueDistance = DistancePlayerAndCircle(blueCircle,whiteCircle,position)
    
    return matchId, mapName, duration, playerName, ranking, liveTime, moveDistance, WhiteDistance, BlueDistance

In [8]:
def combatDistance(a_x,a_y,a_z,v_x,v_y,v_z):
    import builtins
    round = getattr(builtins, "round")
    abs = getattr(builtins, "abs")
    Distance_xy = ((abs(a_x-v_x)**2)+(abs(a_y-v_y)**2))**(1/2)
    Distance_xyz = ((Distance_xy**2)+(abs(a_z-v_z)**2))**(1/2)
    Distance_meter = Distance_xyz/102.4
    return round(Distance_meter,3)

In [9]:
def combatPosition(time,blueCircle,a_x,a_y): 
    import builtins
    abs = getattr(builtins, "abs")
    position = []
    for idx, timeLine in enumerate(blueCircle):
        if idx < len(blueCircle):
            if (timeLine[0] < time) | (blueCircle[idx+1][0] >= time):#사이값
                b_x = timeLine[1]
                b_y = timeLine[2]
                b_r = timeLine[4]
                position = ((abs(b_x-a_x)**2)+(abs(b_y-a_y)**2))**(1/2)/102.4
                break
        else:#마지막
            b_x = timeLine[1]
            b_y = timeLine[2]
            b_r = timeLine[3]
            position = ((abs(b_x-a_x)**2)+(abs(b_y-a_y)**2))**(1/2)/102.4
            break
    return position

In [10]:
def summaryCombat(data):
    matchId = data[0]
    mapName = data[1]
    blueCircle = data[2]
    attacker = data[3]
    time = data[10]
    a_x = data[4]
    a_y = data[5]
    a_z = data[6] 
    v_x = data[7]
    v_y = data[8]
    v_z = data[9]
    distance = combatDistance(a_x,a_y,a_z,v_x,v_y,v_z)
    position = combatPosition(time,blueCircle,a_x,a_y)
    return matchId, mapName, attacker, time, distance, position

In [11]:
def timeSet(data): 
    import re
    import builtins
    sum = getattr(builtins, "sum")
    createAt=re.compile('T[0-9]{2}:[0-9]{2}:[0-9]{2}Z').findall(data[3])
    playTime=re.compile('T[0-9]{2}:[0-9]{2}:[0-9]{2}[.]').findall(data[4])
    (createHour,createMin,createSec) = re.findall("[0-9]{2}",createAt[0])
    (playHour,playMin,playSec) = re.findall("[0-9]{2}",playTime[0])
    Sec = int(playSec) - int(createSec)
    Min = int(playMin) - int(createMin)
    Hour = int(playHour) - int(createHour)
    #년도날짜요일에 대해서는 빼기 때문에 음수에 대해서 올림처리
    if Sec < 0:
        Sec = 60+Sec
        Min -=1
    if Min < 0:
        Min = 60+Min
        Hour -=1
    if Hour < 0:
        Hour = 24+Hour
        
    time = (Hour*3600)+(Min*60)+Sec
             
    return (data[0],data[1],data[2],time)

In [12]:
def playerDataFrame(playerLandingDF,playerPositionDF,AttackDF,rideVehicleDF,leaveVehicleDF,matchInfoDF):
    convertUDF = udf(lambda x:findPos(x),DoubleType())
    
    #착지데이터
    playerLandingDF = playerLandingDF.select(col('matchId'),col('playerName'),col('ranking'),convertUDF(col('x')).alias('xPos')\
                                         ,convertUDF(col('y')).alias('yPos')).distinct()
    playerLandingDF = playerLandingDF.join(matchInfoDF, playerLandingDF['matchId']==matchInfoDF['matchId'],'inner')\
            .select(matchInfoDF['matchId'], matchInfoDF['mapName'], playerLandingDF['ranking'] \
             ,playerLandingDF['playerName'], playerLandingDF['xPos'], playerLandingDF['yPos']).na.drop()
    
    #폴리곤 정보
    from shapely.geometry import Point
    from shapely.geometry.polygon import Polygon
    #폴리곤정보 불러오기
    ErangelPolygon = json.load(open("Erangel_LocalPos_Polygon.json"))
    ErangelArea = dict()
    for name,pos in ErangelPolygon.items():
        pos = (np.array(pos)*100).tolist()
        ErangelArea[name]=Polygon(pos)

    MiramarPolygon = json.load(open("Miramar_LocalPos_Polygon.json"))
    MiramarArea = dict()
    for name,pos in MiramarPolygon.items():
        pos = (np.array(pos)*100).tolist()
        MiramarArea[name]=Polygon(pos)

    #폴리곤정보로 명칭 변경
    tempDF = playerLandingDF.collect()
    tempData = []
    for data in tempDF:
        localName = 'another'
        mapName = data[1]
        if mapName == 'Baltic_Main':
            for name,polygon in ErangelArea.items():
                if polygon.contains(Point(data[4],data[5])):
                    localName = name
        elif mapName == 'Desert_Main':
            for name,polygon in MiramarArea.items():
                if polygon.contains(Point(data[4],data[5])):
                    localName = name
        tempData.append((data[0],data[1],data[2],data[3],localName))

    playerLandingDF = spark.createDataFrame(tempData,['matchId','mapName','ranking','playerName','landingPlace'])
    
    #플레이어 이동정보
    tempDF = playerPositionDF.alias('Pos').join(matchInfoDF.alias('Matches') \
             ,playerPositionDF['matchId']==matchInfoDF['matchId'],'inner')  \
             .select('Matches.matchId','Matches.mapName','Matches.duration','Matches.blueCircle','Matches.whiteCircle'\
             ,'Pos.playerName','Pos.ranking','Pos.position')
    tempRDD = tempDF.rdd.map(summaryDistance)
    playerDistanceDF = tempRDD.toDF(['matchId','mapName','duration','playerName' \
                             ,'ranking','liveTime','moveDistance','WhiteCircleDistance','BlueCircleDistance'])
    
    #공격정보
    tempDF = AttackDF.distinct().alias('Atk').join(matchInfoDF.alias('MI') \
             ,AttackDF['matchId']==matchInfoDF['matchId'],'inner') \
             .select('MI.matchId','MI.mapName','MI.blueCircle','Atk.attacker' \
             ,'Atk.a_x','Atk.a_y','Atk.a_z','Atk.v_x','Atk.v_y','Atk.v_z','Atk.time')
    tempRDD = tempDF.rdd.map(summaryCombat)
    playerCombatDF = tempRDD.toDF(['matchId','mapName','playerName' \
                             ,'attackTime','attackDistance','BlueCircleDistance'])
    #공격자에 대해서 공격정보 묶기 (ReduceByKey 사용)
    tempRDD = playerCombatDF.orderBy('attackTime') \
                        .rdd.map(lambda row : ((row[0],row[1],row[2]),[(row[3],row[4],row[5])])) \
                            .reduceByKey(lambda x,y: x+y) \
                            .map(lambda row : (row[0][0],row[0][1],row[0][2],row[1]))
    playerCombatDF = tempRDD.toDF(['matchId','mapName','playerName','attacks'])
    
    #차량탑승
    tempDF = rideVehicleDF.alias('RV').join(matchInfoDF.alias('MI') \
             ,rideVehicleDF['matchId']==matchInfoDF['matchId'],'inner')  \
             .select('RV.matchId','MI.mapName',col('RV.player').alias('playerName'),'MI.createAt'\
             ,'RV.time')
    tempRDD = tempDF.rdd.map(timeSet)
    rideVehicleTimeDF = tempRDD.toDF(['matchId','mapName','playerName','rideTime'])
    #차량하차
    tempDF = leaveVehicleDF.alias('LV').join(matchInfoDF.alias('MI') \
             ,leaveVehicleDF['matchId']==matchInfoDF['matchId'],'inner')  \
             .select('LV.matchId','MI.mapName',col('LV.player').alias('playerName'),'MI.createAt'\
             ,'LV.time')
    tempRDD = tempDF.rdd.map(timeSet)
    leaveVehicleTimeDF = tempRDD.toDF(['matchId','mapName','playerName','leaveTime'])
    
    #합치기
    playerVehicleRideDF = rideVehicleTimeDF.groupBy('matchId','mapName','playerName') \
                                        .agg(min('rideTime').alias('firstRideTime'))
    playerVehicleLeaveDF = leaveVehicleTimeDF.groupBy('matchId','mapName','playerName') \
                                        .agg(max('leaveTime').alias('lastLeaveTime'))
    playerVehicleDF = playerVehicleRideDF.alias('RV').join(playerVehicleLeaveDF.alias('LV') \
                                    ,((playerVehicleRideDF['matchID']==playerVehicleLeaveDF['matchID']) \
                                      &(playerVehicleRideDF['mapName']==playerVehicleLeaveDF['mapName'])
                                      &(playerVehicleRideDF['playerName']==playerVehicleLeaveDF['playerName'])),'inner') \
                                    .select('RV.matchId','RV.mapName','RV.playerName' \
                                            ,'RV.firstRideTime','LV.lastLeaveTime')
    
    #최종
    matchPlayerDF = playerLandingDF.alias('F').join(playerDistanceDF.alias('S') \
                                    ,((playerLandingDF['matchId']==playerDistanceDF['matchId']) \
                                      &(playerLandingDF['mapName']==playerDistanceDF['mapName']) \
                                      &(playerLandingDF['playerName']==playerDistanceDF['playerName'])),'inner') \
                            .select('F.matchId','F.mapName','F.playerName','F.ranking','S.liveTime' \
                            ,'F.landingPlace','S.moveDistance','S.WhiteCircleDistance','S.BlueCircleDistance').alias('F') \
                                    .join(playerCombatDF.alias('S') \
                                    ,((col('F.matchId')==playerCombatDF['matchId']) \
                                      &(col('F.mapName')==playerCombatDF['mapName']) \
                                      &(col('F.playerName')==playerCombatDF['playerName'])),'leftouter') \
                            .select('F.*','S.attacks').alias('F').join(playerVehicleDF.alias('S') \
                                    ,((col('F.matchId')==playerVehicleDF['matchId']) \
                                      &(col('F.mapName')==playerVehicleDF['mapName']) \
                                      &(col('F.playerName')==playerVehicleDF['playerName'])),'leftouter') \
                            .select('F.*','S.firstRideTime','S.lastLeaveTime').na.fill(0)
    
    return matchPlayerDF

In [13]:
#등수 묶기(순위권)
def splitrank(data):
    if data<=3:
        return "HighRank"
    elif data>=10:
        return "MiddleRank"
    else:
        return "LowRank"
    return data

In [14]:
def newData(matchInfoDF,matchPlayerDF):
    #매치 정보와 플레이어 정보 합치기
    convertUDF = udf(lambda x:splitAttackDistance(x),DoubleType())
    newDF = matchInfoDF.alias('MI').join(matchPlayerDF.alias('LP'),["matchId","mapName"],'inner')\
                          .select('MI.matchId','MI.mapName','LP.playerName','MI.airplaneRoute','LP.landingPlace','LP.ranking'\
                                 ,'LP.liveTime','LP.moveDistance','LP.WhiteCircleDistance','LP.BlueCircleDistance'\
                                 ,'LP.attacks','LP.firstRideTime','LP.lastLeaveTime')
    #등수 순위권으로 변경
    convertUDF = udf(lambda x:splitrank(x),StringType())
    newDF = newDF.select('matchId','mapName','playerName','airplaneRoute','landingPlace' \
                         ,convertUDF(col('ranking')).alias('ranking'),'LP.liveTime','moveDistance' \
                         ,'WhiteCircleDistance','BlueCircleDistance','attacks' \
                         ,'firstRideTime' ,'lastLeaveTime')
    
    return newDF

In [15]:
numberData = input("데이터 번호를 입력 하시오 : ")
try:
    #매치
    matchDF = spark.read.format("json").load("/sparkdata/PUBG/MatchesList{}.json".format(numberData)).distinct()
    circlePositionDF = spark.read.format("json").load("/sparkdata/PUBG/Matches{}_CirclePosition.json".format(numberData)).distinct()
    airplaneRouteDF = spark.read.format("json").load("/sparkdata/PUBG/Matches{}_AirPlaneRoute.json".format(numberData)).distinct()
    airplaneRouteDF = airplaneRouteDF.na.drop()
    matchInfoDF = matchDataFrame(matchDF, circlePositionDF, airplaneRouteDF)
    #플레이어
    playerLandingDF = spark.read.format("json").load("/sparkdata/PUBG/Matches{}_PlayerLading.json".format(numberData)).distinct()
    playerPositionDF = spark.read.format('json').load("/sparkdata/PUBG/Matches{}_PlayerPosition_Baltic_Main.json".format(numberData)).distinct()
    AttackDF = spark.read.format('json').load("/sparkdata/PUBG/Matches{}_AttackerList.json".format(numberData))
    rideVehicleDF = spark.read.format('json').load("/sparkdata/PUBG/Matches{}_RideVehicle.json".format(numberData))
    leaveVehicleDF = spark.read.format('json').load("/sparkdata/PUBG/Matches{}_LeaveVehicle.json".format(numberData))
    matchPlayerDF = playerDataFrame(playerLandingDF,playerPositionDF,AttackDF,rideVehicleDF,leaveVehicleDF,matchInfoDF)
except FileNotFoundError as e:
    print(e,' 파일이 없습니다.')
newData = newData(matchInfoDF,matchPlayerDF)
newData.printSchema()
newData.show(10)
newData.toPandas().to_json("matchPlayer{}.json".format(numberData),orient = 'records')
print('파일 저장 완료')

데이터 번호를 입력 하시오 : 1


KeyboardInterrupt: 