<a href="https://colab.research.google.com/github/hyweisky/gwings/blob/master/SlidingWindowAveragerator.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
from PIL import Image
import requests
from zipfile import ZipFile
from io import BytesIO
import matplotlib.pyplot as plt


In [None]:
class SlidingWindowAveragerator(object):
	### YOUR CODE HERE
	def __init__(self,w_size):
		self.size=w_size
		self.n=0
		self.xs=[]#存放所有的数
		self.sum_all=0 #所有数的和
		self.sum_n=[] #存放前n个数的和
		self.sum_sq_n=[] #存放前n个数的平方和
		self.sum_size=0 #最近size个数的和=所有数的和减去前n-size个数的和
		self.sum_sq_size=0 #最近size个数的平方和=所有数的平方和减去前n-size个数的平方和
		self.total=0	#参与计算平均数和方差的数的总个数，等于n或size
		self.sum_x_sq=0 #所有数的平方和
	def add(self,x):
		self.xs.append(x)
		self.sum_all+=x
		self.sum_x_sq+=x*x
		self.n+=1
		self.sum_n.append(self.sum_all)
		self.sum_sq_n.append(self.sum_x_sq)
		if self.n>self.size:
			self.sum_size=self.sum_all-self.sum_n[self.n-self.size-1]
			self.sum_sq_size=self.sum_x_sq-self.sum_sq_n[self.n-self.size-1]
			self.total=self.size
		else:
			self.sum_size=self.sum_all
			self.sum_sq_size=self.sum_x_sq
			self.total=self.n
	@property
	def avg(self):
		return self.sum_size/self.total
	@property
	def std(self):
		mu=self.avg
		return np.sqrt(self.sum_sq_size/self.total-mu*mu)


In [None]:
class CleanData(object):
	def __init__(self,discount_factor):
		self.a=DiscountedAveragerator(discount_factor)
	

	def filter(self,x,num_stdevs=2.):
		self.a.add(x)
		std=self.a.std
		run_avg=self.a.avg
		return x if run_avg-num_stdevs*std<x<run_avg+num_stdevs*std else run_avg


In [None]:
class MotionDetection(object):
	def __init__(self,num_sigmas=4,discount=0.96):
		'''
		@param num_sigmas:
		@param discount:dicount factor for the averagerator
		'''
		self.discount=discount
		self.num_sigmas=num_sigmas

	def detect_motion(self,img):
		'''
		@param img:h*w*3 image,是给定高度h，宽度w，3通道的3维数组：h*w*3
		@return an h*w boolean matrix.如果当前元素的颜色值：与四个元素的颜色值有偏离，
		则认为检测到了motion
		程序则对img三维矩阵中的第3维颜色值使用DiscountedAveragerator进行计算，
		然后把每个像素值按【u-k*teta，u-k*teta】
		看是否超过了此范围，若是，则认为发生了偏离
		'''
		h=img.shape[0]
		w=img.shape[1]
		c=img.shape[2]
		x=np.ones([h,w,c])*0.96
		g=DiscountedAveragerator(x)
		g.add(img[:,:,0:1])
		g.add(img[:,:,1:2])
		g.add(img[:,:,-1:])
		avgs,stds=g.avg,g.std
		aa=img>avgs+self.num_sigmas*stds
		bb=img<avgs-self.num_sigmas*stds
		result=np.max(np.logical_or(aa,bb),axis=2)
		return result

In [None]:
#start：获取测试图片数据：放到一个cell中运行
from PIL import Image
import requests
from zipfile import ZipFile
from io import BytesIO
import matplotlib.pyplot as plt
#Gets the zip file
ZIP_URL="https://storage.googleapis.com/lucadealfaro-share/GardenSequence.zip"
r=requests.get(ZIP_URL)
# List of images
images_as_arrays=[]
# Makes a file object of the result
with ZipFile(BytesIO(r.content)) as myzip:
	for fn in myzip.namelist():
		with myzip.open(fn) as my_image_file:
			img=Image.open(my_image_file)
			images_as_arrays.append(np.array(img).astype(np.float32))


In [None]:
#测试获取到的图片数据
print(images_as_arrays[0].shape)
a=DiscountedAveragerator(0.96)
img=images_as_arrays[10]
print(img)
print(img[0][0])
h=img.shape[0]
w=img.shape[1]
c=img.shape[2]
print(h,w,c)
# a.add(img)
# # print("min:",a.avg-4*a.std,"max:",a.avg+4*a.std)
# # print("avg:",a.avg,"std",a.std)
# aa=img>a.avg+4*a.std
# bb=img<a.avg-4*a.std
# print(aa)

# cc=np.logical_or(aa,bb)
# dd=np.max(cc,axis=2)
# print(dd)
# plt.imshow(bb,cmap="gnuplot")
# plt.colorbar()
# plt.show()




In [None]:
#定义方法
def detect_motion(image_list, num_sigmas=4., discount=0.96):
    """Takes as input:
    @param image_list: a list of images, all of the same size.
    @param num_sigmas: a parameter specifying how many standard deviations a
        pixel should be to count as detected motion.
    @param discount: the discount factor for the averagerator.
    """
    detector = MotionDetection(num_sigmas=num_sigmas, discount=discount)
    detected_motion = []
    for i, img in enumerate(image_list):
        motion = detector.detect_motion(img)
        # print(np.sum(motion))
        if np.sum(motion) > 500:
            detected_motion.append((i, motion))
    return detected_motion


In [None]:
# Compute the motion detections.
motions = detect_motion(images_as_arrays[:60])
#print(images_as_arrays[0])
# print("motions.shape",motions)
motion_idxs = [i for i, _ in motions]
print(motion_idxs)
#assert motion_idxs == [1, 10, 47, 48, 49, 57, 58, 59]
# assert np.sum(motions[6][1]) == 1199
print(np.sum(motions[0][1]))

# print(np.sum(motions[6][1]))

[2, 3, 4, 6, 8, 10, 12, 17, 19, 20, 21, 22, 23, 24, 26, 27, 28, 30, 31, 32, 33, 34, 36, 38, 41, 42, 43, 44, 45, 47, 49, 50, 56, 57]
586


In [None]:
img=np.random.random((4,5,3))
h=img.shape[0]
w=img.shape[1]
chanel=img.shape[2]
result=np.zeros([h,w],dtype=bool)
print("result:",result)
print("h,w,c",h,w,chanel)
for i in range(h):
  for j in range(w):
    a=DiscountedAveragerator(0.9)
    for c in range(chanel):
      #print(img[i][j][c],"\n")
      a.add(img[i][j][c])
    #print("[a-k*g,a+kg]",a.avg-4*a.std,a.avg+4*a.std)
    result[i][j]=False if a.avg+4*a.std>img[i][j][c]>a.avg-4*a.std else True
    print("result===",result)


In [None]:
img=np.random.random((4,5,3))
img2=np.random.random((4,5,3))
print("a:",img,'\n')
print("b:",img2)
print("a>b=",img>img2)

In [None]:
a=np.random.random((1,5,3))
aa=a>0.8
bb=a<0.2
print("a:",a)
print("aa shape：",aa)
b=np.max(aa,axis=2)
c=np.max(bb,axis=2)
print("b shape：",b.shape)
print("b:",b)
print("c:",c)
print("m:",np.logical_or(b,c))


In [None]:
img=images_as_arrays[1]
# print(img)
x=np.ones([1,1,3])*0.96
print(x.shape)
g=DiscountedAveragerator(x)
g.add(img)
a=g.avg
# h=img.shape[0]
# w=img.shape[1]
# c=img.shape[2]
# r=np.zeros([h,w,c],dtype=bool)
# s=np.zeros([h,w,c],dtype=bool)
# for i in range(h):
#   for j in range(w):
#     g=DiscountedAveragerator(0.96)
#     for k in range(c):
#       g.add(img[i][j][k])
#       # print("value:",img[i][j][k])
#     # print("avg,and std:",g.avg,g.std)
#     r[i][j]=img[i][j]>g.avg+4*g.std
#     s[i][j]=img[i][j]<g.avg-4*g.std
#     if np.sum(s[i][j])+np.sum(r[i][j])>0:
#       print("结果：",img[i][j],r[i][j],g.avg+4*g.std,s[i][j],g.avg-4*g.std)
# result=np.max(np.logical_or(r,s),axis=2)
# print(np.sum(result))


In [None]:
img=images_as_arrays[1]
# print(img[0][0])
# print(img)
x=np.ones([480,640,3])*0.96
g=DiscountedAveragerator(x)
g.add(img[:,:,0:1])
g.add(img[:,:,1:2])
g.add(img[:,:,-1:])
g.avg

In [None]:
class DiscountedAveragerator:
	def __init__(self,alpha):
		self.alpha=alpha
		self.w=0.
		self.sum_x=0.
		self.sum_x_sq=0.
	def add(self,x):
		# print("x:",x)
		self.w=self.alpha*self.w+1.
		self.sum_x=self.alpha*self.sum_x+x
		self.sum_x_sq=self.alpha*self.sum_x_sq+x*x
    
	@property
	def avg(self):
		# print("sum_x=%,w=%",self.sum_x,self.w)
		return self.sum_x/self.w
	@property
	def std(self):
		mu=self.avg
		return np.sqrt(np.maximum(0.,self.sum_x_sq/self.w-mu*mu))

In [None]:
a=np.ones([3,])
b=np.ones([3,])*0.96
print(a*b)

[0.96 0.96 0.96]
